Execution Models
Execution Models
Execution-level data structures track state transitions, observations, and action
results as the engine runs.
Manager Observation
ManagerObservation captures the partial view presented to the manager each timestep.
Bases: BaseModel
Observation provided to manager agent at each timestep.
Source code in manager_agent_gym/schemas/execution/manager.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 | class ManagerObservation(BaseModel):
"""Observation provided to manager agent at each timestep."""
# Allow non-Pydantic types like AgentInterface in fields
model_config = ConfigDict(arbitrary_types_allowed=True)
workflow_summary: str
timestep: int = Field(..., description="Current timestep number")
workflow_id: UUID = Field(..., description="ID of the workflow being executed")
execution_state: str = Field(..., description="Current execution state")
task_status_counts: dict[str, int] = Field(
default_factory=dict, description="Count of tasks by status"
)
ready_task_ids: list[UUID] = Field(
default_factory=list, description="Tasks ready to start"
)
running_task_ids: list[UUID] = Field(
default_factory=list, description="Currently running tasks"
)
completed_task_ids: list[UUID] = Field(
default_factory=list, description="Completed task IDs"
)
failed_task_ids: list[UUID] = Field(
default_factory=list, description="Failed task IDs"
)
available_agent_metadata: list[AgentConfig] = Field(
default_factory=list, description="Available agent metadata"
)
recent_messages: list[Message] = Field(
default_factory=list, description="Recent communications"
)
workflow_progress: float = Field(
..., ge=0.0, le=1.0, description="Completion percentage"
)
observation_timestamp: datetime = Field(default_factory=datetime.now)
# Optional timeline awareness
max_timesteps: int | None = Field(
default=None, description="Configured maximum timesteps for this run"
)
timesteps_remaining: int | None = Field(
default=None, description="Remaining timesteps before reaching the limit"
)
time_progress: float | None = Field(
default=None,
ge=0.0,
le=1.0,
description="Fraction of timestep budget consumed (0..1)",
)
# Constraints visibility
constraints: list[Constraint] = Field(
default_factory=list, description="Workflow constraints (hard/soft/etc.)"
)
# Dynamic ID universes for schema-constrained action generation
# These allow the manager agents to constrain IDs to valid values at generation time
task_ids: list[UUID] = Field(
default_factory=list, description="All task IDs currently in the workflow"
)
resource_ids: list[UUID] = Field(
default_factory=list, description="All resource IDs currently in the workflow"
)
agent_ids: list[str] = Field(
default_factory=list, description="All agent IDs registered in the workflow"
)
stakeholder_profile: StakeholderPublicProfile = Field(
description="Public stakeholder profile",
)
|
Execution State
ExecutionState contains bookkeeping information about the current timestep, finished
tasks, and overall workflow status.
Bases: str, Enum
Possible states of workflow execution.
Source code in manager_agent_gym/schemas/execution/state.py
8
9
10
11
12
13
14
15
16
17 | class ExecutionState(str, Enum):
"""Possible states of workflow execution."""
INITIALIZED = "initialized"
RUNNING = "running"
WAITING_FOR_MANAGER = "waiting_for_manager"
EXECUTING_TASKS = "executing_tasks"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
|
Manager Action Result
ActionResult records validation feedback and side effects produced by manager actions.
Bases: BaseModel
Structured result returned by manager actions.
Example
ActionResult(
action_type="assign_task",
summary="Assigned T123 to ai_writer",
kind="mutation",
data={"task_id": "...", "agent_id": "ai_writer"},
timestep=3,
success=True,
)
Source code in manager_agent_gym/schemas/execution/manager_actions.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 | class ActionResult(BaseModel):
"""Structured result returned by manager actions.
Example:
```python
ActionResult(
action_type="assign_task",
summary="Assigned T123 to ai_writer",
kind="mutation",
data={"task_id": "...", "agent_id": "ai_writer"},
timestep=3,
success=True,
)
```
"""
action_type: Literal[
"assign_task",
"assign_all_pending_tasks",
"create_task",
"remove_task",
"send_message",
"noop",
"get_workflow_status",
"get_available_agents",
"get_pending_tasks",
"refine_task",
"add_task_dependency",
"remove_task_dependency",
"failed_action",
"inspect_task",
"request_end_workflow",
"decompose_task",
"assign_tasks_to_agents",
] = Field(description="Type of action result")
summary: str = Field(description="Short summary of what happened / info returned")
kind: Literal[
"mutation", "info", "noop", "message", "inspection", "failed_action", "unknown"
] = Field(description="Type of action result")
data: dict[str, Any] = Field(
description="Optional structured payload for follow-up use (empty if not applicable)",
)
timestep: int | None = Field(
default=None, description="Timestep of the action, set by the engine"
)
success: bool = Field(
default=True, description="Whether the action succeeded (set by execute)"
)
|