Skip to content

Interrupt & Timeout

Source: examples/interrupt_timeout.py

This example demonstrates the 5 agent-specific CSP extensions through practical AI agent scenarios. Each operator addresses a real coordination problem that arises in multi-agent systems.


1. Interrupt: Priority Override

An agent performing a long-running task needs to be preemptible by higher-priority requests.

process_data = Event("process_data")
return_result = Event("return_result")
handle_priority = Event("handle_priority")
urgent_response = Event("urgent_response")

# Normal task: process data, then return result
task = Prefix(process_data, Prefix(return_result, Stop()))

# Priority handler: handle urgent request
handler = Prefix(handle_priority, Prefix(urgent_response, Stop()))

# Interruptible task: can be preempted at any point
interruptible = Interrupt(primary=task, handler=handler)

print(f"Can do: {interruptible.initials()}")
# process_data (normal) OR handle_priority (interrupt)

The Interrupt operator wraps a primary process so that at any point during its execution, a handler process can take over. The initials include both the primary's next event (process_data) and the handler's trigger (handle_priority). After a normal event, the process remains interruptible. After the interrupt fires, the handler runs exclusively -- the primary is abandoned.

This models scenarios like: an agent analyzing a document can be interrupted by a high-priority user query.

2. Timeout: Bounded LLM Call

LLM API calls can hang indefinitely. A timeout ensures the system always makes progress.

call_llm = Event("call_gpt4")
parse_response = Event("parse_response")
return_cached = Event("return_cached")

# LLM call process
llm_call = Prefix(call_llm, Prefix(parse_response, Stop()))

# Fallback: return cached response
fallback = Prefix(return_cached, Stop())

# Bounded execution: 30 second timeout
bounded = Timeout(process=llm_call, duration=30.0, fallback=fallback)

print(f"Initials: {bounded.initials()}")
# call_gpt4 (start LLM call) OR tau_timeout (timeout fires)

The Timeout operator adds a special TIMEOUT_EVENT (tau) to the initials. If the timeout fires (modeled as a tau transition), the fallback process takes over. If the primary process makes progress, the timeout remains active for subsequent steps. The duration is metadata for analysis -- the CSP semantics use the nondeterministic choice between progress and timeout.

After timeout, the fallback runs (return_cached). After the LLM call starts, the process remains a Timeout instance -- the timeout can still fire during parse_response.

3. Guard: Budget-Gated Agent

Expensive LLM calls should only proceed when budget allows.

budget = {"remaining": 100.0}

expensive_call = Event("call_gpt4o")
process = Event("process_result")

expensive_agent = Prefix(expensive_call, Prefix(process, Stop()))

# Only activate if budget allows
guarded = Guard(
    condition=lambda: budget["remaining"] > 0,
    process=expensive_agent,
)

print(f"Budget > 0: initials = {guarded.initials()}")
# {call_gpt4o}

budget["remaining"] = 0
print(f"Budget = 0: initials = {guarded.initials()}")
# frozenset() -- acts as Stop

budget["remaining"] = 50.0
print(f"Budget restored: initials = {guarded.initials()}")
# {call_gpt4o} again

The Guard operator evaluates a condition (a boolean or callable). When the condition is true, the guarded process behaves normally. When false, it behaves like Stop -- no events are available. The condition can be a lambda that checks runtime state, making it dynamic.

This models resource-constrained agents: only activate expensive capabilities when budget, rate limits, or other conditions are met.

4. Rename: Protocol Bridging

Two agents use different event vocabularies. Rename translates between them.

task_done = Event("task_done")
agent_a = Prefix(task_done, Stop())

work_complete = Event("work_complete")

# Bridge: rename task_done -> work_complete
bridged = Rename.from_dict(agent_a, {task_done: work_complete})

print(f"Original initials: {agent_a.initials()}")   # {task_done}
print(f"Bridged initials: {bridged.initials()}")     # {work_complete}

Rename applies a mapping from old events to new events across the entire process. The original event disappears from the alphabet and is replaced by the new one. Rename.from_dict() is a convenience constructor that accepts a dictionary mapping.

This models protocol bridging in heterogeneous multi-agent systems: Agent A emits task_done but Agent B expects work_complete. The rename adapter makes them compatible without modifying either agent.

5. Pipe: RAG Pipeline

A retrieval-augmented generation pipeline connects a retriever to a processor, hiding the internal channel.

query = Event("query")
emit_docs = Event("emit_docs")
summarize = Event("summarize")

# Retriever: receives query, emits documents
retriever = Prefix(query, Prefix(emit_docs, Stop()))

# Processor: receives documents, produces summary
processor = Prefix(emit_docs, Prefix(summarize, Stop()))

# Pipeline: emit_docs is internal channel
rag_pipeline = Pipe(
    producer=retriever,
    consumer=processor,
    channel=frozenset({emit_docs}),
)

print(f"Pipeline alphabet: {rag_pipeline.alphabet()}")
# {query, summarize} -- emit_docs is hidden

Pipe composes two processes by synchronizing on channel events and then hiding those events from the external alphabet. The result is a pipeline where only the endpoints are visible: query goes in, summarize comes out, and emit_docs is an internal handoff.

This is equivalent to Hiding(Parallel(retriever, processor, sync_set={emit_docs}), hidden={emit_docs}) but expressed as a single, intention-revealing operator.

6. Analysis

The example concludes by analyzing the constructed processes.

lts = build_lts(interruptible)
t = list(traces(lts, max_length=5))
print(f"Interruptible traces ({len(t)}): {t[:5]}")

print(f"Timeout process deadlock-free: {is_deadlock_free(bounded)}")
print(f"Pipeline deadlock-free: {is_deadlock_free(rag_pipeline)}")

The interruptible process has more traces than the plain task because at each step, the interrupt can fire. The timeout process is not deadlock-free (it eventually reaches Stop), but it guarantees progress -- every path leads to a terminal action rather than getting stuck waiting for an LLM response. The pipeline is analyzed for deadlocks to verify the synchronization is correct.


Complete source
"""Interrupt, Timeout, Guard -- Agent-specific CSP extensions.

Demonstrates the 5 new operators for real agent coordination scenarios.
"""

from agenticraft_foundation import (
    TIMEOUT_EVENT,
    Event,
    Guard,
    Interrupt,
    Pipe,
    Prefix,
    Rename,
    Stop,
    Timeout,
    build_lts,
    is_deadlock_free,
    traces,
)

# =============================================================
# Scenario 1: Interruptible Agent Task
# =============================================================
print("=== Interrupt: Priority Override ===")

process_data = Event("process_data")
return_result = Event("return_result")
handle_priority = Event("handle_priority")
urgent_response = Event("urgent_response")

# Normal task: process data, then return result
task = Prefix(process_data, Prefix(return_result, Stop()))

# Priority handler: handle urgent request
handler = Prefix(handle_priority, Prefix(urgent_response, Stop()))

# Interruptible task: can be preempted at any point
interruptible = Interrupt(primary=task, handler=handler)

print(f"Can do: {interruptible.initials()}")
# process_data (normal) OR handle_priority (interrupt)

# After normal event, still interruptible
after_normal = interruptible.after(process_data)
print(f"After process_data: {after_normal}")
print(f"Still interruptible: {isinstance(after_normal, Interrupt)}")

# After interrupt event, handler takes over
after_interrupt = interruptible.after(handle_priority)
print(f"After interrupt: {after_interrupt}")
print(f"Handler active: {not isinstance(after_interrupt, Interrupt)}")

# =============================================================
# Scenario 2: LLM Call with Timeout
# =============================================================
print("\n=== Timeout: Bounded LLM Call ===")

call_llm = Event("call_gpt4")
parse_response = Event("parse_response")
return_cached = Event("return_cached")

# LLM call process
llm_call = Prefix(call_llm, Prefix(parse_response, Stop()))

# Fallback: return cached response
fallback = Prefix(return_cached, Stop())

# Bounded execution: 30 second timeout
bounded = Timeout(process=llm_call, duration=30.0, fallback=fallback)

print(f"Initials: {bounded.initials()}")
# call_gpt4 (start LLM call) OR tau_timeout (timeout fires)

# If timeout fires, switch to fallback
after_timeout = bounded.after(TIMEOUT_EVENT)
print(f"After timeout: {after_timeout}")
assert return_cached in after_timeout.initials()

# If LLM starts, still has timeout
after_start = bounded.after(call_llm)
print(f"After call_llm: {after_start}")
assert isinstance(after_start, Timeout)

# =============================================================
# Scenario 3: Conditional Agent Activation
# =============================================================
print("\n=== Guard: Budget-Gated Agent ===")

budget = {"remaining": 100.0}

expensive_call = Event("call_gpt4o")
process = Event("process_result")

expensive_agent = Prefix(expensive_call, Prefix(process, Stop()))

# Only activate if budget allows
guarded = Guard(
    condition=lambda: budget["remaining"] > 0,
    process=expensive_agent,
)

print(f"Budget > 0: initials = {guarded.initials()}")
assert expensive_call in guarded.initials()

# Deplete budget
budget["remaining"] = 0
print(f"Budget = 0: initials = {guarded.initials()}")
assert guarded.initials() == frozenset()  # Acts as Stop

# Restore budget
budget["remaining"] = 50.0
print(f"Budget restored: initials = {guarded.initials()}")
assert expensive_call in guarded.initials()

# =============================================================
# Scenario 4: Protocol Bridging via Rename
# =============================================================
print("\n=== Rename: Protocol Bridging ===")

# Agent A uses "task_done" event
task_done = Event("task_done")
agent_a = Prefix(task_done, Stop())

# Agent B expects "work_complete" event
work_complete = Event("work_complete")

# Bridge: rename task_done -> work_complete
bridged = Rename.from_dict(agent_a, {task_done: work_complete})

print(f"Original initials: {agent_a.initials()}")
print(f"Bridged initials: {bridged.initials()}")
assert work_complete in bridged.initials()
assert task_done not in bridged.initials()

# =============================================================
# Scenario 5: RAG Pipeline
# =============================================================
print("\n=== Pipe: RAG Pipeline ===")

query = Event("query")
emit_docs = Event("emit_docs")
summarize = Event("summarize")

# Retriever: receives query, emits documents
retriever = Prefix(query, Prefix(emit_docs, Stop()))

# Processor: receives documents, produces summary
processor = Prefix(emit_docs, Prefix(summarize, Stop()))

# Pipeline: emit_docs is internal channel
rag_pipeline = Pipe(
    producer=retriever,
    consumer=processor,
    channel=frozenset({emit_docs}),
)

print(f"Pipeline alphabet: {rag_pipeline.alphabet()}")
assert query in rag_pipeline.alphabet()
assert summarize in rag_pipeline.alphabet()
assert emit_docs not in rag_pipeline.alphabet()  # Hidden!

# =============================================================
# Analysis
# =============================================================
print("\n=== Analysis ===")

lts = build_lts(interruptible)
t = list(traces(lts, max_length=5))
print(f"Interruptible traces ({len(t)}): {t[:5]}")

print(f"Timeout process deadlock-free: {is_deadlock_free(bounded)}")
print(f"Pipeline deadlock-free: {is_deadlock_free(rag_pipeline)}")

print("\nAll examples passed.")