# Agent control flow
!!! note "Understanding AI agent basics"
We also recently created a long tutorial on understanding the basics of building an AI agent: [View it here](https://minimal-agent.com).
!!! abstract "Understanding the default agent"
* This guide shows the control flow of the default agent.
* After this, you're ready to [remix & extend mini](cookbook.md)
The following diagram shows the control flow of the mini agent:
```mermaid
flowchart TD
subgraph run["Agent.run(task)"]
direction TB
A["Initialize messages"] --> B
B["Agent.step"] --> C{"Exception?"}
C -->|Yes| D["Agent.add_messages
(also re-raises exceptions that don't inherit from InterruptAgentFlow)"]
C -->|No| E{"messages[-1].role == exit?"}
D --> E
E -->|No| B
E -->|Yes| F["Return result"]
end
subgraph step["Agent.step()
Single iteration"]
direction TB
S1["Agent.query"] --> S2["Agent.execute_actions"]
end
subgraph query["Agent.query()
Also checks for cost limits
"]
direction TB
Q3["Model.query"] --> Q4["Agent.add_messages"]
end
subgraph execute_actions["Agent.execute_actions(message)"]
direction TB
E2["Environment.execute
Also raises the Submitted exception if we're done"] --> E3["Model.format_observation_messages"]
E3 --> E4["Agent.add_messages"]
end
B -.-> step
S1 -.-> query
S2 -.-> execute_actions
```
And here is the code that implements it:
??? note "Default agent class"
- [Read on GitHub](https://github.com/swe-agent/mini-swe-agent/blob/main/src/minisweagent/agents/default.py)
- [API reference](../reference/agents/default.md)
```python
--8<-- "src/minisweagent/agents/default.py"
```
Essentially, `DefaultAgent.run` calls `DefaultAgent.step` in a loop until the agent has finished its task.
The `step` method is the core of the agent:
```python
def step(self) -> list[dict]:
return self.execute_actions(self.query())
```
It does the following:
1. Queries the model for a response based on the current messages (`DefaultAgent.query`, calling `Model.query`)
2. Executes all actions in the response (`DefaultAgent.execute_actions`, calling `Environment.execute` for each action)
3. Formats the observation messages via `Model.format_observation_messages`
4. Adds the observations to the messages
Here's `query`:
```python
def query(self) -> dict:
# ... limit checks ...
message = self.model.query(self.messages)
self.add_messages(message)
return message
```
And `execute_actions`:
```python
def execute_actions(self, message: dict) -> list[dict]:
outputs = [self.env.execute(action) for action in message.get...
return self.add_messages(*self.model.format_observation_messages(...))
```
The interesting bit is how we handle error conditions and the finish condition:
This uses exceptions that inherit from `InterruptAgentFlow`. All these exceptions carry messages that get added to the trajectory.
- `Submitted` is raised when the agent has finished its task. For example, the environment checks if the command output starts with a magic string:
```python
# In Environment.execute
def _check_finished(self, output: dict):
lines = output.get("output", "").lstrip().splitlines(keepends=True)
if lines and lines[0].strip() == "COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT":
raise Submitted({"role": "exit", "content": ..., "extra": {...}})
```
- `LimitsExceeded` is raised when we hit a cost or step limit
- `FormatError` is raised when the output from the LM is not in the expected format
- `TimeoutError` is raised when the action took too long to execute
- `UserInterruption` is raised when the user interrupts the agent
The `DefaultAgent.run` method catches these exceptions and handles them by adding the corresponding messages to the messages list. The loop continues until a message with `role="exit"` is added.
```python
while True:
try:
self.step()
except InterruptAgentFlow as e:
self.add_messages(*e.messages)
if self.messages[-1].get("role") == "exit":
break
```
Using exceptions for the control flow is a lot easier than passing around flags and states, especially when extending or subclassing the agent.
{% include-markdown "_footer.md" %}