Skip to content

4. MASO Controls

1What breaks
2Tools miss it
3Epistemic integrity
4MASO controls
5Instrumentation

After this module you will be able to

  • Identify which MASO control domains require engineering implementation vs. policy configuration
  • Implement concrete control patterns for Execution Control, Observability, and Data Protection
  • Integrate MASO controls with LangGraph, AutoGen, and CrewAI agent frameworks
  • Build circuit breaker and PACE resilience patterns for agent chains
  • Map each control to the specific failure mode it addresses from Module 1

MASO from an engineering perspective

The AIRS framework defines MASO (Multi-Agent Security Operations) as controls across eight domains. As an engineering lead, you don't need to implement every control on day one. You need to know which domains require you to build something, which require you to configure something, and which require you to integrate with something your security team provides.

Domain Engineering work Your role
1. Prompt, Goal & Epistemic Integrity Build verification receipts (Module 3), retrieval completeness checks Build
2. Identity & Access Integrate with IAM; scope agent permissions Configure
3. Data Protection Build data boundary enforcement; control data flow between agents Build
4. Execution Control Build circuit breakers, PACE patterns, delegation limits Build
5. Observability Build chain-level metrics, integrity dashboards, alerting Build
6. Supply Chain Integrate model provenance checks; pin versions Configure
7. Privileged Agent Governance Build elevated oversight for agents with high-impact permissions Build + Configure
8. Objective Intent Build OISpec schemas, tactical/strategic evaluation, judge monitoring Build

This module focuses on the four domains where you build: Epistemic Integrity (covered in Module 3), Data Protection, Execution Control, and Observability. Module 5 goes deeper on Observability with specific instrumentation patterns.


Execution Control: what you build

Execution control is the set of runtime mechanisms that stop, pause, or redirect an agent chain when something goes wrong. In traditional systems, you have circuit breakers, retries, and fallbacks. Agent chains need the same patterns, adapted for the specific failure modes from Module 1.

Pattern 1: The chain circuit breaker

A circuit breaker for agent chains works differently from a traditional circuit breaker. Traditional circuit breakers trip on error rates. Agent chain circuit breakers trip on integrity signals, because the failures you're catching don't produce errors.

from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta

class BreakerState(Enum):
    CLOSED = "closed"        # Normal operation
    OPEN = "open"            # Chain halted
    HALF_OPEN = "half_open"  # Testing recovery

@dataclass
class ChainCircuitBreaker:
    """Circuit breaker that trips on integrity failures,
    not just errors."""

    failure_threshold: int = 3
    recovery_timeout: timedelta = timedelta(minutes=5)
    state: BreakerState = BreakerState.CLOSED
    failure_count: int = 0
    last_failure: datetime = field(
        default_factory=lambda: datetime.min
    )

    def check_receipt(self, receipt) -> bool:
        """Evaluate a verification receipt.
        Returns True if the chain should proceed."""
        if self.state == BreakerState.OPEN:
            if datetime.utcnow() - self.last_failure > self.recovery_timeout:
                self.state = BreakerState.HALF_OPEN
            else:
                return False  # Chain blocked

        if not receipt.integrity_pass:
            self.failure_count += 1
            self.last_failure = datetime.utcnow()

            if self.failure_count >= self.failure_threshold:
                self.state = BreakerState.OPEN
                return False  # Trip the breaker

            return False  # Block this request but don't trip

        # Receipt passed
        if self.state == BreakerState.HALF_OPEN:
            self.state = BreakerState.CLOSED
            self.failure_count = 0

        return True

The critical difference from a traditional circuit breaker: this one evaluates verification receipts, not HTTP status codes. A request that returns 200 OK with a failed integrity check will trip the breaker.

Pattern 2: PACE resilience

PACE (Primary, Alternate, Contingency, Emergency) is a resilience pattern from the AIRS framework that gives agent chains structured fallback paths:

PACE resilience flow: Primary, Alternate, Contingency, Emergency paths

Here is a concrete implementation:

from dataclasses import dataclass
from typing import Callable, Optional

@dataclass
class PACEConfig:
    alternate_fn: Callable      # Retry with different params
    contingency_fn: Callable    # Human escalation
    emergency_fn: Callable      # Hard block + alert
    max_alternate_retries: int = 2

async def execute_with_pace(
    primary_fn: Callable,
    pace: PACEConfig,
    chain_context: dict,
) -> dict:
    """Execute an agent step with PACE fallback."""

    # Primary path
    result = await primary_fn(chain_context)
    if result.receipt.integrity_pass:
        result.path_used = "primary"
        return result

    # Alternate path: retry with modifications
    for attempt in range(pace.max_alternate_retries):
        alt_context = {
            **chain_context,
            "force_full_retrieval": True,
            "bypass_cache": True,
            "attempt": attempt + 1,
        }
        result = await pace.alternate_fn(alt_context)
        if result.receipt.integrity_pass:
            result.path_used = f"alternate (attempt {attempt + 1})"
            return result

    # Contingency path: human review
    result.path_used = "contingency"
    human_decision = await pace.contingency_fn(result)
    if human_decision.approved:
        result.receipt.human_override = True
        return result

    # Emergency path: hard block
    result.path_used = "emergency"
    await pace.emergency_fn(result, chain_context)
    raise ChainBlockedError(
        f"Chain {chain_context['chain_id']} blocked by PACE emergency path"
    )

Why PACE matters: Without PACE, a failed integrity check has two options: proceed anyway or hard-fail. PACE gives you graduated responses. The alternate path often resolves transient issues (cache staleness, temporary retrieval limits). The contingency path handles genuine edge cases. The emergency path is your last resort. Most importantly, every path is logged and auditable.

Pattern 3: Delegation depth limits

To address the constraint dilution problem from Module 1, enforce hard limits on delegation chains:

@dataclass
class DelegationPolicy:
    max_depth: int = 3
    allowed_sub_agents: list[str] = field(default_factory=list)
    constraint_propagation: bool = True  # Sub-agents inherit constraints

def check_delegation(
    current_depth: int,
    target_agent: str,
    policy: DelegationPolicy,
    original_constraints: dict,
) -> tuple[bool, dict]:
    """Check whether a delegation is permitted and return
    the constraints to propagate."""

    if current_depth >= policy.max_depth:
        return False, {}

    if target_agent not in policy.allowed_sub_agents:
        return False, {}

    if policy.constraint_propagation:
        return True, original_constraints
    else:
        return True, {}  # Constraints not propagated (risky)

The constraint_propagation flag is critical. When set to True, the original request's constraints (user preferences, safety requirements, data access scopes) are passed to every sub-agent in the delegation chain. This directly addresses the "telephone game" problem from Module 1.


Data Protection: what you build

Data Protection in the MASO framework goes beyond encryption and access control. For multi-agent systems, it means controlling what data crosses agent boundaries and ensuring data integrity is maintained throughout the chain.

Pattern 4: Agent boundary data contracts

Define explicit contracts for what data can flow between agents:

@dataclass
class AgentBoundaryContract:
    """Defines what data Agent A can send to Agent B."""

    source_agent: str
    target_agent: str
    allowed_fields: list[str]
    required_fields: list[str]
    max_payload_tokens: int
    pii_allowed: bool = False
    must_include_receipt: bool = True

def enforce_boundary(
    output: AgentOutput,
    contract: AgentBoundaryContract,
) -> AgentOutput:
    """Filter and validate agent output against the boundary
    contract before passing to the next agent."""

    if contract.must_include_receipt and output.receipt is None:
        raise BoundaryViolation("Receipt required but not present")

    # Check payload size
    token_count = count_tokens(output.content)
    if token_count > contract.max_payload_tokens:
        raise BoundaryViolation(
            f"Payload {token_count} tokens exceeds "
            f"limit {contract.max_payload_tokens}"
        )

    # Check required fields
    for field_name in contract.required_fields:
        if field_name not in output.structured_fields:
            raise BoundaryViolation(
                f"Required field '{field_name}' missing"
            )

    # PII check
    if not contract.pii_allowed and contains_pii(output.content):
        raise BoundaryViolation("PII detected in non-PII boundary")

    return output

Boundary contracts serve two purposes:

  1. Security: They prevent uncontrolled data flow between agents (PII leakage, scope creep)
  2. Integrity: The must_include_receipt flag ensures that verification receipts are never dropped as data moves through the chain

Pattern 5: Retrieval scope enforcement

To prevent the truncated retrieval problem, enforce retrieval scope at the data layer:

async def scoped_retrieval(
    query: str,
    source_id: str,
    scope_config: dict,
) -> tuple[list, DataSourceAccess]:
    """Perform a retrieval with scope enforcement.
    Returns results and a DataSourceAccess record for the receipt."""

    # Get expected count for this source
    expected_count = await get_expected_count(source_id, query)

    # Perform retrieval
    results = await vector_store.query(
        query=query,
        limit=scope_config.get("max_results", 1000),
    )

    # Build the data source access record
    access = DataSourceAccess(
        source_id=source_id,
        query=query,
        expected_count=expected_count,
        actual_count=len(results),
        freshness=datetime.utcnow(),
        truncated=len(results) < expected_count,
        truncation_reason=(
            "result_limit" if len(results) >= scope_config.get(
                "max_results", 1000
            )
            else "context_window_limit" if len(results) < expected_count
            else None
        ),
    )

    return results, access

The key insight: the retrieval function itself computes the completeness metadata. The agent doesn't need to know what "complete" means; the data layer enforces it and reports the results in a format the verification receipt can consume.


Integration with agent frameworks

The patterns above are framework-agnostic. Here is how they map to common agent frameworks.

LangGraph integration

LangGraph uses a state graph model where nodes are agent steps and edges are transitions. MASO controls integrate at the edge level:

from langgraph.graph import StateGraph

def build_controlled_graph():
    graph = StateGraph(ChainState)

    # Add agent nodes
    graph.add_node("agent_a", agent_a_node)
    graph.add_node("agent_b", agent_b_node)
    graph.add_node("agent_c", agent_c_node)

    # Add integrity check nodes between agents
    graph.add_node("check_a_to_b", integrity_check_node)
    graph.add_node("check_b_to_c", integrity_check_node)

    # Add PACE fallback nodes
    graph.add_node("alternate_b", alternate_retrieval_node)
    graph.add_node("escalate", human_escalation_node)
    graph.add_node("emergency_halt", emergency_halt_node)

    # Wire the graph: agent -> check -> next agent or fallback
    graph.add_edge("agent_a", "check_a_to_b")
    graph.add_conditional_edges(
        "check_a_to_b",
        route_on_integrity,
        {
            "pass": "agent_b",
            "fail_retry": "alternate_b",
            "fail_escalate": "escalate",
            "fail_block": "emergency_halt",
        },
    )

    graph.add_edge("agent_b", "check_b_to_c")
    graph.add_conditional_edges(
        "check_b_to_c",
        route_on_integrity,
        {
            "pass": "agent_c",
            "fail_retry": "alternate_b",
            "fail_escalate": "escalate",
            "fail_block": "emergency_halt",
        },
    )

    return graph.compile()

The integrity check sits on the graph edge. The conditional routing implements PACE: pass goes to the next agent, fail_retry goes to the alternate path, fail_escalate goes to contingency, fail_block goes to emergency.

AutoGen integration

AutoGen uses a conversational model where agents communicate via messages. MASO controls integrate as message interceptors:

from autogen import ConversableAgent

class IntegrityCheckingAgent(ConversableAgent):
    """Wrapper that adds receipt verification to any AutoGen agent."""

    def __init__(self, inner_agent, breaker, **kwargs):
        super().__init__(**kwargs)
        self.inner_agent = inner_agent
        self.breaker = breaker

    async def a_receive(self, message, sender, **kwargs):
        # Check incoming receipt
        receipt = message.get("_verification_receipt")
        if receipt and not self.breaker.check_receipt(receipt):
            return {
                "content": "CHAIN_HALTED: Upstream integrity check failed",
                "_chain_status": "halted",
                "_halt_reason": receipt.integrity_verdict,
            }

        # Process normally
        response = await self.inner_agent.a_receive(
            message, sender, **kwargs
        )

        # Attach outgoing receipt
        response["_verification_receipt"] = self.generate_receipt(
            message, response
        )
        return response

CrewAI integration

CrewAI uses a task-based model where agents are assigned tasks with specific tools. MASO controls integrate at the task execution level:

from crewai import Task, Crew

def create_controlled_task(
    agent,
    description,
    integrity_config,
):
    """Create a CrewAI task with integrity controls."""

    original_task = Task(
        description=description,
        agent=agent,
    )

    # Wrap the task execution with integrity checking
    original_execute = original_task.execute

    async def controlled_execute(*args, **kwargs):
        result = await original_execute(*args, **kwargs)

        # Generate receipt
        receipt = generate_receipt_from_task(
            original_task, result, integrity_config
        )

        # Check integrity
        if not receipt.integrity_pass:
            return handle_integrity_failure(
                result, receipt, integrity_config
            )

        result.receipt = receipt
        return result

    original_task.execute = controlled_execute
    return original_task

Framework integration principle: Regardless of the framework, MASO controls insert at agent boundaries, the points where data flows from one agent to another. The control checks the verification receipt, enforces boundary contracts, and routes to PACE fallback paths when integrity checks fail. The agents themselves don't need to be modified; the controls operate in the orchestration layer.


Objective Intent: what you build

The patterns above catch mechanical faults: incomplete retrieval, boundary violations, integrity failures. They do not evaluate whether the system is doing what the developer designed it to do. That requires Objective Intent, the newest MASO domain.

What you implement

As an engineering lead, Objective Intent means building three things:

1. OISpec schemas. Every agent, judge, and workflow gets a structured, version-controlled Objective Intent Specification. This is a typed JSON schema (not free-form prose) that declares the component's goal, success/failure criteria, permitted tools, prohibited actions, data scope, and authority limits.

@dataclass
class AgentOISpec:
    """Objective Intent Specification for a single agent."""

    agent_id: str
    goal: str
    success_criteria: list[str]
    failure_criteria: list[str]
    permitted_tools: list[str]
    prohibited_actions: list[str]
    data_scope: str
    authority: dict  # can_delegate, max_depth, can_create_agents
    risk_classification: str  # low, medium, high, critical
    evaluation_frequency: str  # every_action, per_phase, post_execution
    version: int
    oisspec_hash: str  # sha256 for integrity verification

2. Tactical evaluation. A per-agent judge that evaluates each agent's actions against its OISpec. This integrates at the same agent boundaries where you already insert receipt checks and boundary enforcement.

async def evaluate_against_oisspec(
    agent_output: AgentOutput,
    oisspec: AgentOISpec,
    judge_model: str,
) -> OISpecVerdict:
    """Tactical evaluation: does this output comply
    with the agent's declared intent?"""

    verdict = await judge_model.evaluate(
        output=agent_output,
        success_criteria=oisspec.success_criteria,
        failure_criteria=oisspec.failure_criteria,
        parameter_bounds=oisspec.parameters,
    )

    return OISpecVerdict(
        compliant=verdict.passes_all_criteria,
        alignment_score=verdict.alignment_score,
        violations=verdict.detected_violations,
        agent_id=oisspec.agent_id,
        oisspec_version=oisspec.version,
    )

3. Strategic evaluation. A workflow-level evaluator that assesses whether all agents collectively achieved the workflow's declared intent. This is the control that catches "every agent did its job, but the result is wrong."

The strategic evaluator receives the workflow OISpec, all agent OISpecs, agent outputs, tactical judge verdicts, and the audit trail. It evaluates whether the aggregate behaviour satisfies the workflow intent, looking specifically for emergent failures that individual tactical judges cannot detect.

Why Objective Intent matters for engineering: The existing controls catch what goes wrong mechanically. Objective Intent catches what goes wrong semantically: an agent operating outside its declared purpose, a workflow producing outcomes nobody specified, or a judge drifting from its evaluation criteria. For the full OISpec schema and tiered control specification, see Objective Intent.


Putting it together: the controlled chain

Here is how all the patterns combine for the Phantom Compliance scenario:

Trade Request arrives
│
├── Agent A: Data Enrichment
│   ├── Performs work, generates receipt
│   └── Output + receipt passed to boundary check
│
├── Boundary Check A→B
│   ├── Validates boundary contract (required fields, no PII)
│   ├── Checks Agent A receipt: integrity_pass == true
│   └── PASSES → Agent B receives output + receipt
│
├── Agent B: Compliance Check
│   ├── Scoped retrieval: gets 47 of 312 results
│   ├── Receipt records completeness_ratio: 0.15
│   ├── Receipt records confidence_gap: 0.79
│   ├── Receipt integrity_pass: FALSE
│   └── Output + receipt passed to boundary check
│
├── Boundary Check B→C
│   ├── Checks Agent B receipt: integrity_pass == FALSE
│   ├── Circuit breaker evaluates: failure count < threshold
│   └── Routes to PACE alternate path
│
├── PACE Alternate: Forced Full Retrieval
│   ├── Retries with bypass_cache=true, limit=10000
│   ├── Gets 312 of 312 results
│   ├── Receipt records completeness_ratio: 1.0
│   ├── Re-runs compliance check with full data
│   ├── Finds restricted security in result 204
│   ├── Returns FLAGGED (not CLEAR)
│   └── Receipt integrity_pass: TRUE
│
├── Boundary Check B→C (second attempt)
│   ├── Checks receipt: integrity_pass == TRUE
│   └── PASSES → Agent C receives FLAGGED result
│
└── Agent C: Approval Decision
    ├── Receives FLAGGED compliance status
    ├── Decision: REJECT trade, escalate to compliance officer
    └── Trade blocked. Incident logged. No damage.

Total added latency for the alternate path: approximately 2-4 seconds (one additional retrieval + LLM inference). Total damage prevented: one trade involving a restricted security.


Reflection

Consider your own agent framework. Where are the agent boundaries in your system? Are those boundaries explicit (you can point to the code where Agent A's output becomes Agent B's input) or implicit (the framework handles it internally)? If implicit, how would you insert a boundary check?

Consider

If your boundaries are implicit (common in LangChain and AutoGen), you may need to refactor to make them explicit before you can insert MASO controls. This is a one-time architectural change, not an ongoing burden. The refactoring pattern is: (1) identify where agent outputs become inputs, (2) insert an explicit handoff function at that point, (3) add receipt checking and boundary enforcement in the handoff function.


Next: Instrumentation & Evidence →