4. MASO Controls¶
After this module you will be able to¶
- Identify which MASO control domains require engineering implementation vs. policy configuration
- Implement concrete control patterns for Execution Control, Observability, and Data Protection
- Integrate MASO controls with LangGraph, AutoGen, and CrewAI agent frameworks
- Build circuit breaker and PACE resilience patterns for agent chains
- Map each control to the specific failure mode it addresses from Module 1
MASO from an engineering perspective¶
The AIRS framework defines MASO (Multi-Agent Security Operations) as controls across eight domains. As an engineering lead, you don't need to implement every control on day one. You need to know which domains require you to build something, which require you to configure something, and which require you to integrate with something your security team provides.
| Domain | Engineering work | Your role |
|---|---|---|
| 1. Prompt, Goal & Epistemic Integrity | Build verification receipts (Module 3), retrieval completeness checks | Build |
| 2. Identity & Access | Integrate with IAM; scope agent permissions | Configure |
| 3. Data Protection | Build data boundary enforcement; control data flow between agents | Build |
| 4. Execution Control | Build circuit breakers, PACE patterns, delegation limits | Build |
| 5. Observability | Build chain-level metrics, integrity dashboards, alerting | Build |
| 6. Supply Chain | Integrate model provenance checks; pin versions | Configure |
| 7. Privileged Agent Governance | Build elevated oversight for agents with high-impact permissions | Build + Configure |
| 8. Objective Intent | Build OISpec schemas, tactical/strategic evaluation, judge monitoring | Build |
This module focuses on the four domains where you build: Epistemic Integrity (covered in Module 3), Data Protection, Execution Control, and Observability. Module 5 goes deeper on Observability with specific instrumentation patterns.
Execution Control: what you build¶
Execution control is the set of runtime mechanisms that stop, pause, or redirect an agent chain when something goes wrong. In traditional systems, you have circuit breakers, retries, and fallbacks. Agent chains need the same patterns, adapted for the specific failure modes from Module 1.
Pattern 1: The chain circuit breaker¶
A circuit breaker for agent chains works differently from a traditional circuit breaker. Traditional circuit breakers trip on error rates. Agent chain circuit breakers trip on integrity signals, because the failures you're catching don't produce errors.
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
class BreakerState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Chain halted
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class ChainCircuitBreaker:
"""Circuit breaker that trips on integrity failures,
not just errors."""
failure_threshold: int = 3
recovery_timeout: timedelta = timedelta(minutes=5)
state: BreakerState = BreakerState.CLOSED
failure_count: int = 0
last_failure: datetime = field(
default_factory=lambda: datetime.min
)
def check_receipt(self, receipt) -> bool:
"""Evaluate a verification receipt.
Returns True if the chain should proceed."""
if self.state == BreakerState.OPEN:
if datetime.utcnow() - self.last_failure > self.recovery_timeout:
self.state = BreakerState.HALF_OPEN
else:
return False # Chain blocked
if not receipt.integrity_pass:
self.failure_count += 1
self.last_failure = datetime.utcnow()
if self.failure_count >= self.failure_threshold:
self.state = BreakerState.OPEN
return False # Trip the breaker
return False # Block this request but don't trip
# Receipt passed
if self.state == BreakerState.HALF_OPEN:
self.state = BreakerState.CLOSED
self.failure_count = 0
return True
The critical difference from a traditional circuit breaker: this one evaluates verification receipts, not HTTP status codes. A request that returns 200 OK with a failed integrity check will trip the breaker.
Pattern 2: PACE resilience¶
PACE (Primary, Alternate, Contingency, Emergency) is a resilience pattern from the AIRS framework that gives agent chains structured fallback paths:
Here is a concrete implementation:
from dataclasses import dataclass
from typing import Callable, Optional
@dataclass
class PACEConfig:
alternate_fn: Callable # Retry with different params
contingency_fn: Callable # Human escalation
emergency_fn: Callable # Hard block + alert
max_alternate_retries: int = 2
async def execute_with_pace(
primary_fn: Callable,
pace: PACEConfig,
chain_context: dict,
) -> dict:
"""Execute an agent step with PACE fallback."""
# Primary path
result = await primary_fn(chain_context)
if result.receipt.integrity_pass:
result.path_used = "primary"
return result
# Alternate path: retry with modifications
for attempt in range(pace.max_alternate_retries):
alt_context = {
**chain_context,
"force_full_retrieval": True,
"bypass_cache": True,
"attempt": attempt + 1,
}
result = await pace.alternate_fn(alt_context)
if result.receipt.integrity_pass:
result.path_used = f"alternate (attempt {attempt + 1})"
return result
# Contingency path: human review
result.path_used = "contingency"
human_decision = await pace.contingency_fn(result)
if human_decision.approved:
result.receipt.human_override = True
return result
# Emergency path: hard block
result.path_used = "emergency"
await pace.emergency_fn(result, chain_context)
raise ChainBlockedError(
f"Chain {chain_context['chain_id']} blocked by PACE emergency path"
)
Why PACE matters: Without PACE, a failed integrity check has two options: proceed anyway or hard-fail. PACE gives you graduated responses. The alternate path often resolves transient issues (cache staleness, temporary retrieval limits). The contingency path handles genuine edge cases. The emergency path is your last resort. Most importantly, every path is logged and auditable.
Pattern 3: Delegation depth limits¶
To address the constraint dilution problem from Module 1, enforce hard limits on delegation chains:
@dataclass
class DelegationPolicy:
max_depth: int = 3
allowed_sub_agents: list[str] = field(default_factory=list)
constraint_propagation: bool = True # Sub-agents inherit constraints
def check_delegation(
current_depth: int,
target_agent: str,
policy: DelegationPolicy,
original_constraints: dict,
) -> tuple[bool, dict]:
"""Check whether a delegation is permitted and return
the constraints to propagate."""
if current_depth >= policy.max_depth:
return False, {}
if target_agent not in policy.allowed_sub_agents:
return False, {}
if policy.constraint_propagation:
return True, original_constraints
else:
return True, {} # Constraints not propagated (risky)
The constraint_propagation flag is critical. When set to True, the original request's constraints (user preferences, safety requirements, data access scopes) are passed to every sub-agent in the delegation chain. This directly addresses the "telephone game" problem from Module 1.
Data Protection: what you build¶
Data Protection in the MASO framework goes beyond encryption and access control. For multi-agent systems, it means controlling what data crosses agent boundaries and ensuring data integrity is maintained throughout the chain.
Pattern 4: Agent boundary data contracts¶
Define explicit contracts for what data can flow between agents:
@dataclass
class AgentBoundaryContract:
"""Defines what data Agent A can send to Agent B."""
source_agent: str
target_agent: str
allowed_fields: list[str]
required_fields: list[str]
max_payload_tokens: int
pii_allowed: bool = False
must_include_receipt: bool = True
def enforce_boundary(
output: AgentOutput,
contract: AgentBoundaryContract,
) -> AgentOutput:
"""Filter and validate agent output against the boundary
contract before passing to the next agent."""
if contract.must_include_receipt and output.receipt is None:
raise BoundaryViolation("Receipt required but not present")
# Check payload size
token_count = count_tokens(output.content)
if token_count > contract.max_payload_tokens:
raise BoundaryViolation(
f"Payload {token_count} tokens exceeds "
f"limit {contract.max_payload_tokens}"
)
# Check required fields
for field_name in contract.required_fields:
if field_name not in output.structured_fields:
raise BoundaryViolation(
f"Required field '{field_name}' missing"
)
# PII check
if not contract.pii_allowed and contains_pii(output.content):
raise BoundaryViolation("PII detected in non-PII boundary")
return output
Boundary contracts serve two purposes:
- Security: They prevent uncontrolled data flow between agents (PII leakage, scope creep)
- Integrity: The
must_include_receiptflag ensures that verification receipts are never dropped as data moves through the chain
Pattern 5: Retrieval scope enforcement¶
To prevent the truncated retrieval problem, enforce retrieval scope at the data layer:
async def scoped_retrieval(
query: str,
source_id: str,
scope_config: dict,
) -> tuple[list, DataSourceAccess]:
"""Perform a retrieval with scope enforcement.
Returns results and a DataSourceAccess record for the receipt."""
# Get expected count for this source
expected_count = await get_expected_count(source_id, query)
# Perform retrieval
results = await vector_store.query(
query=query,
limit=scope_config.get("max_results", 1000),
)
# Build the data source access record
access = DataSourceAccess(
source_id=source_id,
query=query,
expected_count=expected_count,
actual_count=len(results),
freshness=datetime.utcnow(),
truncated=len(results) < expected_count,
truncation_reason=(
"result_limit" if len(results) >= scope_config.get(
"max_results", 1000
)
else "context_window_limit" if len(results) < expected_count
else None
),
)
return results, access
The key insight: the retrieval function itself computes the completeness metadata. The agent doesn't need to know what "complete" means; the data layer enforces it and reports the results in a format the verification receipt can consume.
Integration with agent frameworks¶
The patterns above are framework-agnostic. Here is how they map to common agent frameworks.
LangGraph integration¶
LangGraph uses a state graph model where nodes are agent steps and edges are transitions. MASO controls integrate at the edge level:
from langgraph.graph import StateGraph
def build_controlled_graph():
graph = StateGraph(ChainState)
# Add agent nodes
graph.add_node("agent_a", agent_a_node)
graph.add_node("agent_b", agent_b_node)
graph.add_node("agent_c", agent_c_node)
# Add integrity check nodes between agents
graph.add_node("check_a_to_b", integrity_check_node)
graph.add_node("check_b_to_c", integrity_check_node)
# Add PACE fallback nodes
graph.add_node("alternate_b", alternate_retrieval_node)
graph.add_node("escalate", human_escalation_node)
graph.add_node("emergency_halt", emergency_halt_node)
# Wire the graph: agent -> check -> next agent or fallback
graph.add_edge("agent_a", "check_a_to_b")
graph.add_conditional_edges(
"check_a_to_b",
route_on_integrity,
{
"pass": "agent_b",
"fail_retry": "alternate_b",
"fail_escalate": "escalate",
"fail_block": "emergency_halt",
},
)
graph.add_edge("agent_b", "check_b_to_c")
graph.add_conditional_edges(
"check_b_to_c",
route_on_integrity,
{
"pass": "agent_c",
"fail_retry": "alternate_b",
"fail_escalate": "escalate",
"fail_block": "emergency_halt",
},
)
return graph.compile()
The integrity check sits on the graph edge. The conditional routing implements PACE: pass goes to the next agent, fail_retry goes to the alternate path, fail_escalate goes to contingency, fail_block goes to emergency.
AutoGen integration¶
AutoGen uses a conversational model where agents communicate via messages. MASO controls integrate as message interceptors:
from autogen import ConversableAgent
class IntegrityCheckingAgent(ConversableAgent):
"""Wrapper that adds receipt verification to any AutoGen agent."""
def __init__(self, inner_agent, breaker, **kwargs):
super().__init__(**kwargs)
self.inner_agent = inner_agent
self.breaker = breaker
async def a_receive(self, message, sender, **kwargs):
# Check incoming receipt
receipt = message.get("_verification_receipt")
if receipt and not self.breaker.check_receipt(receipt):
return {
"content": "CHAIN_HALTED: Upstream integrity check failed",
"_chain_status": "halted",
"_halt_reason": receipt.integrity_verdict,
}
# Process normally
response = await self.inner_agent.a_receive(
message, sender, **kwargs
)
# Attach outgoing receipt
response["_verification_receipt"] = self.generate_receipt(
message, response
)
return response
CrewAI integration¶
CrewAI uses a task-based model where agents are assigned tasks with specific tools. MASO controls integrate at the task execution level:
from crewai import Task, Crew
def create_controlled_task(
agent,
description,
integrity_config,
):
"""Create a CrewAI task with integrity controls."""
original_task = Task(
description=description,
agent=agent,
)
# Wrap the task execution with integrity checking
original_execute = original_task.execute
async def controlled_execute(*args, **kwargs):
result = await original_execute(*args, **kwargs)
# Generate receipt
receipt = generate_receipt_from_task(
original_task, result, integrity_config
)
# Check integrity
if not receipt.integrity_pass:
return handle_integrity_failure(
result, receipt, integrity_config
)
result.receipt = receipt
return result
original_task.execute = controlled_execute
return original_task
Framework integration principle: Regardless of the framework, MASO controls insert at agent boundaries, the points where data flows from one agent to another. The control checks the verification receipt, enforces boundary contracts, and routes to PACE fallback paths when integrity checks fail. The agents themselves don't need to be modified; the controls operate in the orchestration layer.
Objective Intent: what you build¶
The patterns above catch mechanical faults: incomplete retrieval, boundary violations, integrity failures. They do not evaluate whether the system is doing what the developer designed it to do. That requires Objective Intent, the newest MASO domain.
What you implement¶
As an engineering lead, Objective Intent means building three things:
1. OISpec schemas. Every agent, judge, and workflow gets a structured, version-controlled Objective Intent Specification. This is a typed JSON schema (not free-form prose) that declares the component's goal, success/failure criteria, permitted tools, prohibited actions, data scope, and authority limits.
@dataclass
class AgentOISpec:
"""Objective Intent Specification for a single agent."""
agent_id: str
goal: str
success_criteria: list[str]
failure_criteria: list[str]
permitted_tools: list[str]
prohibited_actions: list[str]
data_scope: str
authority: dict # can_delegate, max_depth, can_create_agents
risk_classification: str # low, medium, high, critical
evaluation_frequency: str # every_action, per_phase, post_execution
version: int
oisspec_hash: str # sha256 for integrity verification
2. Tactical evaluation. A per-agent judge that evaluates each agent's actions against its OISpec. This integrates at the same agent boundaries where you already insert receipt checks and boundary enforcement.
async def evaluate_against_oisspec(
agent_output: AgentOutput,
oisspec: AgentOISpec,
judge_model: str,
) -> OISpecVerdict:
"""Tactical evaluation: does this output comply
with the agent's declared intent?"""
verdict = await judge_model.evaluate(
output=agent_output,
success_criteria=oisspec.success_criteria,
failure_criteria=oisspec.failure_criteria,
parameter_bounds=oisspec.parameters,
)
return OISpecVerdict(
compliant=verdict.passes_all_criteria,
alignment_score=verdict.alignment_score,
violations=verdict.detected_violations,
agent_id=oisspec.agent_id,
oisspec_version=oisspec.version,
)
3. Strategic evaluation. A workflow-level evaluator that assesses whether all agents collectively achieved the workflow's declared intent. This is the control that catches "every agent did its job, but the result is wrong."
The strategic evaluator receives the workflow OISpec, all agent OISpecs, agent outputs, tactical judge verdicts, and the audit trail. It evaluates whether the aggregate behaviour satisfies the workflow intent, looking specifically for emergent failures that individual tactical judges cannot detect.
Why Objective Intent matters for engineering: The existing controls catch what goes wrong mechanically. Objective Intent catches what goes wrong semantically: an agent operating outside its declared purpose, a workflow producing outcomes nobody specified, or a judge drifting from its evaluation criteria. For the full OISpec schema and tiered control specification, see Objective Intent.
Putting it together: the controlled chain¶
Here is how all the patterns combine for the Phantom Compliance scenario:
Trade Request arrives
│
├── Agent A: Data Enrichment
│ ├── Performs work, generates receipt
│ └── Output + receipt passed to boundary check
│
├── Boundary Check A→B
│ ├── Validates boundary contract (required fields, no PII)
│ ├── Checks Agent A receipt: integrity_pass == true
│ └── PASSES → Agent B receives output + receipt
│
├── Agent B: Compliance Check
│ ├── Scoped retrieval: gets 47 of 312 results
│ ├── Receipt records completeness_ratio: 0.15
│ ├── Receipt records confidence_gap: 0.79
│ ├── Receipt integrity_pass: FALSE
│ └── Output + receipt passed to boundary check
│
├── Boundary Check B→C
│ ├── Checks Agent B receipt: integrity_pass == FALSE
│ ├── Circuit breaker evaluates: failure count < threshold
│ └── Routes to PACE alternate path
│
├── PACE Alternate: Forced Full Retrieval
│ ├── Retries with bypass_cache=true, limit=10000
│ ├── Gets 312 of 312 results
│ ├── Receipt records completeness_ratio: 1.0
│ ├── Re-runs compliance check with full data
│ ├── Finds restricted security in result 204
│ ├── Returns FLAGGED (not CLEAR)
│ └── Receipt integrity_pass: TRUE
│
├── Boundary Check B→C (second attempt)
│ ├── Checks receipt: integrity_pass == TRUE
│ └── PASSES → Agent C receives FLAGGED result
│
└── Agent C: Approval Decision
├── Receives FLAGGED compliance status
├── Decision: REJECT trade, escalate to compliance officer
└── Trade blocked. Incident logged. No damage.
Total added latency for the alternate path: approximately 2-4 seconds (one additional retrieval + LLM inference). Total damage prevented: one trade involving a restricted security.
Reflection
Consider your own agent framework. Where are the agent boundaries in your system? Are those boundaries explicit (you can point to the code where Agent A's output becomes Agent B's input) or implicit (the framework handles it internally)? If implicit, how would you insert a boundary check?
Consider
If your boundaries are implicit (common in LangChain and AutoGen), you may need to refactor to make them explicit before you can insert MASO controls. This is a one-time architectural change, not an ongoing burden. The refactoring pattern is: (1) identify where agent outputs become inputs, (2) insert an explicit handoff function at that point, (3) add receipt checking and boundary enforcement in the handoff function.