Skip to content

Schemas Overview

Schemas Overview

Data models define the observable state shared between the execution engine, manager, and workers. The sections below highlight the most commonly used Pydantic models.

Workflow

The Workflow model represents the full task graph, resources, and constraint state that is passed to the execution engine.

Bases: BaseModel

Source code in manager_agent_gym/schemas/core/workflow.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
class Workflow(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)
    """
    A complete workflow containing tasks, agents, and execution state.

    This represents the core environment for the Manager Agent POSG.
    """

    # Essential fields (previously from BaseEntity)
    id: UUID = Field(
        default_factory=uuid4,
        description="Unique identifier for this workflow instance.",
        examples=[str(uuid4())],
    )
    name: str = Field(
        ...,
        description="Human-readable name for dashboards and logs.",
        examples=["IPO Readiness"],
    )
    workflow_goal: str = Field(
        ...,
        description="Detailed objective and acceptance criteria for the workflow run.",
        examples=[
            "Objective: Secure AOC and Operating Licence. Acceptance: AOC issued, OL granted, inspections passed.",
        ],
    )
    owner_id: UUID = Field(
        ...,
        description="ID of the workflow owner (tenant/user)",
        examples=[str(uuid4())],
    )

    # Core POSG state components
    tasks: dict[UUID, Task] = Field(
        default_factory=dict,
        description="Task graph nodes (G). Keys are task UUIDs; values are Task models.",
        examples=[
            {str(uuid4()): Task(name="Draft plan", description="...").model_dump()}
        ],
    )
    resources: dict[UUID, Resource] = Field(
        default_factory=dict,
        description="Resource registry (R). Keys are resource UUIDs; values are Resource models.",
    )
    agents: dict[str, AgentInterface] = Field(
        default_factory=dict,
        description="Available agents (W). Keys are agent ids; values are live AgentInterface instances.",
    )
    messages: list[Message] = Field(
        default_factory=list,
        description="Communication history (C). Appended by the communication service.",
    )

    # Workflow metadata
    constraints: list[Constraint] = Field(
        default_factory=list,
        description="Hard/soft constraints used by evaluators and planning",
    )

    # Execution tracking
    started_at: datetime | None = Field(
        default=None, description="When execution started (set by engine)"
    )
    # Optional run seed for reproducibility; engine sets this when provided
    seed: int = Field(default=42, description="Run-level seed for reproducibility")
    completed_at: datetime | None = Field(
        default=None, description="When execution completed (set by engine)"
    )
    is_active: bool = Field(
        default=False, description="Whether the workflow is currently active"
    )

    # Metrics for evaluation
    total_cost: float = Field(
        default=0.0, description="Cumulative actual cost reported by agents"
    )
    # Sum of all task-level simulated durations (hours), reported by agents
    total_simulated_hours: float = Field(
        default=0.0,
        description="Total simulated time across all completed tasks (hours)",
    )

    @property
    def workflow_id(self) -> UUID:
        """Alias for id field to maintain compatibility."""
        return self.id

    @property
    def total_budget(self) -> float:
        """Sum of estimated costs across all tasks and nested subtasks."""
        total = 0.0
        for task in self.tasks.values():
            # include top-level estimate
            if task.estimated_cost is not None:
                try:
                    total += float(task.estimated_cost)
                except Exception:
                    pass
            # include nested subtasks
            for subtask in task.get_all_subtasks_flat():
                if subtask.estimated_cost is not None:
                    total += float(subtask.estimated_cost)

        return total

    @property
    def total_expected_hours(self) -> float:
        """Sum of estimated duration hours across all tasks and nested subtasks."""
        total = 0.0
        for task in self.tasks.values():
            if task.estimated_duration_hours is not None:
                total += float(task.estimated_duration_hours)
            for subtask in task.get_all_subtasks_flat():
                if subtask.estimated_duration_hours is not None:
                    total += float(subtask.estimated_duration_hours)
        return total

    def add_task(self, task: Task) -> None:
        """Add a task to the workflow."""
        self.tasks[task.id] = task

    def add_resource(self, resource: Resource) -> None:
        """Add a resource to the workflow."""
        self.resources[resource.id] = resource

    def get_task_output_resources(self, task: Task) -> list[Resource]:
        """Return realized output resources for a given task by id lookup."""
        results: list[Resource] = []
        for rid in task.output_resource_ids:
            res = self.resources.get(rid)
            if isinstance(res, Resource):
                results.append(res)
        return results

    def get_all_resources(self) -> list[Resource]:
        """Return all resources currently registered in the workflow."""
        return list(self.resources.values())

    def add_agent(self, agent: AgentInterface) -> None:
        """Add an agent to the workflow."""
        self.agents[agent.agent_id] = agent

    def get_ready_tasks(self) -> list[Task]:
        """Get atomic tasks that are ready to start (all dependencies satisfied).

        Recursively considers leaf subtasks and ensures they are registered as
        executable tasks so downstream systems can schedule and update them.
        """
        # 1) Propagate dependencies from composite parents down to all leaf subtasks,
        # then register any leaf subtasks into the top-level task registry so they
        # can be scheduled and updated consistently by the engine/manager actions.
        try:

            def _expand_to_leaf_dependencies(dep_ids: set[UUID]) -> set[UUID]:
                """Expand any composite task IDs in dep_ids into their atomic descendant IDs."""
                expanded: set[UUID] = set()
                for dep_id in dep_ids:
                    dep_task = self.find_task_by_id(dep_id)
                    if dep_task is None:
                        # Keep unknown dependency to let validation catch it later
                        expanded.add(dep_id)
                        continue
                    if dep_task.is_atomic_task():
                        expanded.add(dep_task.id)
                    else:
                        for leaf in dep_task.get_atomic_subtasks():
                            expanded.add(leaf.id)
                return expanded

            def _propagate_and_register(task: Task, inherited_deps: set[UUID]) -> None:
                # Aggregate dependencies from ancestors including this node
                aggregated_raw: set[UUID] = set(inherited_deps)
                aggregated_raw.update(task.dependency_task_ids)
                # Expand any composite dependencies to their leaf IDs
                aggregated_expanded = _expand_to_leaf_dependencies(aggregated_raw)

                # If composite, push aggregated deps to children and recurse
                if not task.is_atomic_task():
                    for child in task.subtasks:
                        child_deps_raw = set(child.dependency_task_ids)
                        child_deps_expanded = _expand_to_leaf_dependencies(
                            child_deps_raw
                        )
                        merged = child_deps_expanded.union(aggregated_expanded)
                        child.dependency_task_ids = list(merged)
                        _propagate_and_register(child, aggregated_expanded)
                else:
                    # Atomic leaf: rewrite its deps to the expanded set (including its own)
                    own_expanded = _expand_to_leaf_dependencies(
                        set(task.dependency_task_ids)
                    )
                    effective = own_expanded.union(aggregated_expanded)
                    task.dependency_task_ids = list(effective)
                    # Ensure it is available at top level for scheduling
                    if task.id not in self.tasks:
                        self.tasks[task.id] = task

            for root in list(self.tasks.values()):
                _propagate_and_register(root, set())
        except Exception:
            logger.error(
                "When trying to add dependancies from a parent to its children, an error occurred",
                exc_info=True,
            )
            # Defensive: if any malformed structure, fallback to current registry only
            pass

        # 2) Compute readiness based on completed tasks
        completed_task_ids = {
            tid
            for tid, task in self.tasks.items()
            if task.status == TaskStatus.COMPLETED
        }

        ready = []
        for task in self.tasks.values():
            if (
                task.status in (TaskStatus.PENDING, TaskStatus.READY)
                and task.is_atomic_task()
                and task.is_ready_to_start(completed_task_ids)
            ):
                # Mark as READY to make the state explicit for observers
                task.status = TaskStatus.READY
                ready.append(task)
        return ready

    def get_available_agents(self) -> list[AgentInterface]:
        """Get agents that are currently available for task assignment."""
        return [agent for agent in self.agents.values() if agent.is_available]

    def find_task_by_id(self, task_id: UUID) -> Task | None:
        """Find a task by ID in the workflow."""
        if task_id in self.tasks:
            return self.tasks[task_id]

        # Search in task hierarchies (subtasks)
        for task in self.tasks.values():
            found = task.find_task_by_id(task_id)
            if found:
                return found
        return None

    def is_complete(self) -> bool:
        """Check if all atomic tasks in the workflow are completed."""
        atomic_tasks = [task for task in self.tasks.values() if task.is_atomic_task()]
        if not atomic_tasks:
            return False
        return all(task.status == TaskStatus.COMPLETED for task in atomic_tasks)

    def get_task_dependencies_graph(self) -> dict[UUID, list[UUID]]:
        """Get the task dependency graph as adjacency list."""
        return {
            task_id: task.dependency_task_ids for task_id, task in self.tasks.items()
        }

    def validate_task_graph(self) -> bool:
        """Validate that the task graph is executable.

        Checks:
        - No cycles in the effective atomic dependency graph
        - All referenced dependencies exist in the workflow
        - The effective atomic-task dependency graph has at least one start
          and is acyclic (i.e., the workflow is completable without deadlock)
        """
        # Build a registry of all tasks including nested subtasks, and a parent map
        all_tasks: dict[UUID, Task] = {}
        parent_of: dict[UUID, UUID] = {}

        def register_task_tree(task: Task) -> None:
            all_tasks[task.id] = task
            for child in task.subtasks:
                parent_of[child.id] = task.id
                register_task_tree(child)

        for root in self.tasks.values():
            register_task_tree(root)

        # Identify atomic (leaf) tasks
        leaf_tasks = [t for t in all_tasks.values() if t.is_atomic_task()]
        if not leaf_tasks:
            return False

        leaf_ids = {t.id for t in leaf_tasks}

        def expand_to_leaf_ids(dep_ids: set[UUID]) -> set[UUID]:
            expanded: set[UUID] = set()
            for dep_id in dep_ids:
                dep_task = all_tasks.get(dep_id)
                if dep_task is None:
                    # Unknown dependency
                    expanded.add(dep_id)
                    continue
                if dep_task.is_atomic_task():
                    expanded.add(dep_task.id)
                else:
                    for leaf in dep_task.get_atomic_subtasks():
                        expanded.add(leaf.id)
            return expanded

        # Compute effective dependencies for leaves (including ancestors), expanded to leaves
        effective_deps: dict[UUID, set[UUID]] = {}
        for leaf in leaf_tasks:
            agg: set[UUID] = set(leaf.dependency_task_ids)
            ancestor_id = parent_of.get(leaf.id)
            while ancestor_id is not None:
                ancestor = all_tasks.get(ancestor_id)
                if ancestor is None:
                    break
                agg.update(ancestor.dependency_task_ids)
                ancestor_id = parent_of.get(ancestor_id)
            expanded = expand_to_leaf_ids(agg)
            # Validate that all deps exist and are leaf IDs after expansion
            for dep_id in expanded:
                if dep_id not in all_tasks:
                    return False
                if dep_id not in leaf_ids:
                    return False
            # Avoid self-dependency
            expanded.discard(leaf.id)
            effective_deps[leaf.id] = expanded

        # Build atomic graph
        indegree: dict[UUID, int] = {t.id: 0 for t in leaf_tasks}
        successors: dict[UUID, list[UUID]] = {t.id: [] for t in leaf_tasks}

        for leaf_id, deps in effective_deps.items():
            for dep_id in deps:
                indegree[leaf_id] += 1
                successors[dep_id].append(leaf_id)

        # Kahn's algorithm to detect cycles / ensure at least one start node
        queue: list[UUID] = [lid for lid, deg in indegree.items() if deg == 0]
        if not queue:
            return False

        processed = 0
        while queue:
            nid = queue.pop()
            processed += 1
            for m in successors.get(nid, []):
                indegree[m] -= 1
                if indegree[m] == 0:
                    queue.append(m)

        if processed < len(leaf_tasks):
            return False

        return True

    def pretty_print(
        self, include_resources: bool = True, max_preview_chars: int = 300
    ) -> str:
        """Return a human-readable summary of the workflow, tasks, and selected resources."""
        lines: list[str] = []
        lines.append("-" * 70)
        lines.append(f"Workflow: {self.name} (ID: {self.id})")
        lines.append(f"Goal: {self.workflow_goal}")
        lines.append(
            f"Budget (est): ${self.total_budget:.2f} | Expected hours: {self.total_expected_hours:.2f}"
        )
        lines.append(f"Cost (actual): ${self.total_cost:.2f}")
        lines.append(
            f"Agents: {len(self.agents)} | Resources: {len(self.resources)} | Tasks: {len(self.tasks)}"
        )
        lines.append("-" * 70)
        lines.append("Tasks:")
        for t in self.tasks.values():
            lines.append(t.pretty_print(indent=1))
        if include_resources and self.resources:
            lines.append("\nResources:")
            for r in self.resources.values():
                try:
                    lines.append(r.pretty_print(max_preview_chars=max_preview_chars))
                except Exception:
                    lines.append(f"Resource: {r.name} (ID: {r.id})")
        return "\n".join(lines)

model_config = ConfigDict(arbitrary_types_allowed=True) class-attribute instance-attribute

A complete workflow containing tasks, agents, and execution state.

This represents the core environment for the Manager Agent POSG.

total_budget: float property

Sum of estimated costs across all tasks and nested subtasks.

total_expected_hours: float property

Sum of estimated duration hours across all tasks and nested subtasks.

workflow_id: UUID property

Alias for id field to maintain compatibility.

add_agent(agent: AgentInterface) -> None

Add an agent to the workflow.

Source code in manager_agent_gym/schemas/core/workflow.py
159
160
161
def add_agent(self, agent: AgentInterface) -> None:
    """Add an agent to the workflow."""
    self.agents[agent.agent_id] = agent

add_resource(resource: Resource) -> None

Add a resource to the workflow.

Source code in manager_agent_gym/schemas/core/workflow.py
142
143
144
def add_resource(self, resource: Resource) -> None:
    """Add a resource to the workflow."""
    self.resources[resource.id] = resource

add_task(task: Task) -> None

Add a task to the workflow.

Source code in manager_agent_gym/schemas/core/workflow.py
138
139
140
def add_task(self, task: Task) -> None:
    """Add a task to the workflow."""
    self.tasks[task.id] = task

find_task_by_id(task_id: UUID) -> Task | None

Find a task by ID in the workflow.

Source code in manager_agent_gym/schemas/core/workflow.py
251
252
253
254
255
256
257
258
259
260
261
def find_task_by_id(self, task_id: UUID) -> Task | None:
    """Find a task by ID in the workflow."""
    if task_id in self.tasks:
        return self.tasks[task_id]

    # Search in task hierarchies (subtasks)
    for task in self.tasks.values():
        found = task.find_task_by_id(task_id)
        if found:
            return found
    return None

get_all_resources() -> list[Resource]

Return all resources currently registered in the workflow.

Source code in manager_agent_gym/schemas/core/workflow.py
155
156
157
def get_all_resources(self) -> list[Resource]:
    """Return all resources currently registered in the workflow."""
    return list(self.resources.values())

get_available_agents() -> list[AgentInterface]

Get agents that are currently available for task assignment.

Source code in manager_agent_gym/schemas/core/workflow.py
247
248
249
def get_available_agents(self) -> list[AgentInterface]:
    """Get agents that are currently available for task assignment."""
    return [agent for agent in self.agents.values() if agent.is_available]

get_ready_tasks() -> list[Task]

Get atomic tasks that are ready to start (all dependencies satisfied).

Recursively considers leaf subtasks and ensures they are registered as executable tasks so downstream systems can schedule and update them.

Source code in manager_agent_gym/schemas/core/workflow.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def get_ready_tasks(self) -> list[Task]:
    """Get atomic tasks that are ready to start (all dependencies satisfied).

    Recursively considers leaf subtasks and ensures they are registered as
    executable tasks so downstream systems can schedule and update them.
    """
    # 1) Propagate dependencies from composite parents down to all leaf subtasks,
    # then register any leaf subtasks into the top-level task registry so they
    # can be scheduled and updated consistently by the engine/manager actions.
    try:

        def _expand_to_leaf_dependencies(dep_ids: set[UUID]) -> set[UUID]:
            """Expand any composite task IDs in dep_ids into their atomic descendant IDs."""
            expanded: set[UUID] = set()
            for dep_id in dep_ids:
                dep_task = self.find_task_by_id(dep_id)
                if dep_task is None:
                    # Keep unknown dependency to let validation catch it later
                    expanded.add(dep_id)
                    continue
                if dep_task.is_atomic_task():
                    expanded.add(dep_task.id)
                else:
                    for leaf in dep_task.get_atomic_subtasks():
                        expanded.add(leaf.id)
            return expanded

        def _propagate_and_register(task: Task, inherited_deps: set[UUID]) -> None:
            # Aggregate dependencies from ancestors including this node
            aggregated_raw: set[UUID] = set(inherited_deps)
            aggregated_raw.update(task.dependency_task_ids)
            # Expand any composite dependencies to their leaf IDs
            aggregated_expanded = _expand_to_leaf_dependencies(aggregated_raw)

            # If composite, push aggregated deps to children and recurse
            if not task.is_atomic_task():
                for child in task.subtasks:
                    child_deps_raw = set(child.dependency_task_ids)
                    child_deps_expanded = _expand_to_leaf_dependencies(
                        child_deps_raw
                    )
                    merged = child_deps_expanded.union(aggregated_expanded)
                    child.dependency_task_ids = list(merged)
                    _propagate_and_register(child, aggregated_expanded)
            else:
                # Atomic leaf: rewrite its deps to the expanded set (including its own)
                own_expanded = _expand_to_leaf_dependencies(
                    set(task.dependency_task_ids)
                )
                effective = own_expanded.union(aggregated_expanded)
                task.dependency_task_ids = list(effective)
                # Ensure it is available at top level for scheduling
                if task.id not in self.tasks:
                    self.tasks[task.id] = task

        for root in list(self.tasks.values()):
            _propagate_and_register(root, set())
    except Exception:
        logger.error(
            "When trying to add dependancies from a parent to its children, an error occurred",
            exc_info=True,
        )
        # Defensive: if any malformed structure, fallback to current registry only
        pass

    # 2) Compute readiness based on completed tasks
    completed_task_ids = {
        tid
        for tid, task in self.tasks.items()
        if task.status == TaskStatus.COMPLETED
    }

    ready = []
    for task in self.tasks.values():
        if (
            task.status in (TaskStatus.PENDING, TaskStatus.READY)
            and task.is_atomic_task()
            and task.is_ready_to_start(completed_task_ids)
        ):
            # Mark as READY to make the state explicit for observers
            task.status = TaskStatus.READY
            ready.append(task)
    return ready

get_task_dependencies_graph() -> dict[UUID, list[UUID]]

Get the task dependency graph as adjacency list.

Source code in manager_agent_gym/schemas/core/workflow.py
270
271
272
273
274
def get_task_dependencies_graph(self) -> dict[UUID, list[UUID]]:
    """Get the task dependency graph as adjacency list."""
    return {
        task_id: task.dependency_task_ids for task_id, task in self.tasks.items()
    }

get_task_output_resources(task: Task) -> list[Resource]

Return realized output resources for a given task by id lookup.

Source code in manager_agent_gym/schemas/core/workflow.py
146
147
148
149
150
151
152
153
def get_task_output_resources(self, task: Task) -> list[Resource]:
    """Return realized output resources for a given task by id lookup."""
    results: list[Resource] = []
    for rid in task.output_resource_ids:
        res = self.resources.get(rid)
        if isinstance(res, Resource):
            results.append(res)
    return results

is_complete() -> bool

Check if all atomic tasks in the workflow are completed.

Source code in manager_agent_gym/schemas/core/workflow.py
263
264
265
266
267
268
def is_complete(self) -> bool:
    """Check if all atomic tasks in the workflow are completed."""
    atomic_tasks = [task for task in self.tasks.values() if task.is_atomic_task()]
    if not atomic_tasks:
        return False
    return all(task.status == TaskStatus.COMPLETED for task in atomic_tasks)

pretty_print(include_resources: bool = True, max_preview_chars: int = 300) -> str

Return a human-readable summary of the workflow, tasks, and selected resources.

Source code in manager_agent_gym/schemas/core/workflow.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def pretty_print(
    self, include_resources: bool = True, max_preview_chars: int = 300
) -> str:
    """Return a human-readable summary of the workflow, tasks, and selected resources."""
    lines: list[str] = []
    lines.append("-" * 70)
    lines.append(f"Workflow: {self.name} (ID: {self.id})")
    lines.append(f"Goal: {self.workflow_goal}")
    lines.append(
        f"Budget (est): ${self.total_budget:.2f} | Expected hours: {self.total_expected_hours:.2f}"
    )
    lines.append(f"Cost (actual): ${self.total_cost:.2f}")
    lines.append(
        f"Agents: {len(self.agents)} | Resources: {len(self.resources)} | Tasks: {len(self.tasks)}"
    )
    lines.append("-" * 70)
    lines.append("Tasks:")
    for t in self.tasks.values():
        lines.append(t.pretty_print(indent=1))
    if include_resources and self.resources:
        lines.append("\nResources:")
        for r in self.resources.values():
            try:
                lines.append(r.pretty_print(max_preview_chars=max_preview_chars))
            except Exception:
                lines.append(f"Resource: {r.name} (ID: {r.id})")
    return "\n".join(lines)

validate_task_graph() -> bool

Validate that the task graph is executable.

Checks: - No cycles in the effective atomic dependency graph - All referenced dependencies exist in the workflow - The effective atomic-task dependency graph has at least one start and is acyclic (i.e., the workflow is completable without deadlock)

Source code in manager_agent_gym/schemas/core/workflow.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def validate_task_graph(self) -> bool:
    """Validate that the task graph is executable.

    Checks:
    - No cycles in the effective atomic dependency graph
    - All referenced dependencies exist in the workflow
    - The effective atomic-task dependency graph has at least one start
      and is acyclic (i.e., the workflow is completable without deadlock)
    """
    # Build a registry of all tasks including nested subtasks, and a parent map
    all_tasks: dict[UUID, Task] = {}
    parent_of: dict[UUID, UUID] = {}

    def register_task_tree(task: Task) -> None:
        all_tasks[task.id] = task
        for child in task.subtasks:
            parent_of[child.id] = task.id
            register_task_tree(child)

    for root in self.tasks.values():
        register_task_tree(root)

    # Identify atomic (leaf) tasks
    leaf_tasks = [t for t in all_tasks.values() if t.is_atomic_task()]
    if not leaf_tasks:
        return False

    leaf_ids = {t.id for t in leaf_tasks}

    def expand_to_leaf_ids(dep_ids: set[UUID]) -> set[UUID]:
        expanded: set[UUID] = set()
        for dep_id in dep_ids:
            dep_task = all_tasks.get(dep_id)
            if dep_task is None:
                # Unknown dependency
                expanded.add(dep_id)
                continue
            if dep_task.is_atomic_task():
                expanded.add(dep_task.id)
            else:
                for leaf in dep_task.get_atomic_subtasks():
                    expanded.add(leaf.id)
        return expanded

    # Compute effective dependencies for leaves (including ancestors), expanded to leaves
    effective_deps: dict[UUID, set[UUID]] = {}
    for leaf in leaf_tasks:
        agg: set[UUID] = set(leaf.dependency_task_ids)
        ancestor_id = parent_of.get(leaf.id)
        while ancestor_id is not None:
            ancestor = all_tasks.get(ancestor_id)
            if ancestor is None:
                break
            agg.update(ancestor.dependency_task_ids)
            ancestor_id = parent_of.get(ancestor_id)
        expanded = expand_to_leaf_ids(agg)
        # Validate that all deps exist and are leaf IDs after expansion
        for dep_id in expanded:
            if dep_id not in all_tasks:
                return False
            if dep_id not in leaf_ids:
                return False
        # Avoid self-dependency
        expanded.discard(leaf.id)
        effective_deps[leaf.id] = expanded

    # Build atomic graph
    indegree: dict[UUID, int] = {t.id: 0 for t in leaf_tasks}
    successors: dict[UUID, list[UUID]] = {t.id: [] for t in leaf_tasks}

    for leaf_id, deps in effective_deps.items():
        for dep_id in deps:
            indegree[leaf_id] += 1
            successors[dep_id].append(leaf_id)

    # Kahn's algorithm to detect cycles / ensure at least one start node
    queue: list[UUID] = [lid for lid, deg in indegree.items() if deg == 0]
    if not queue:
        return False

    processed = 0
    while queue:
        nid = queue.pop()
        processed += 1
        for m in successors.get(nid, []):
            indegree[m] -= 1
            if indegree[m] == 0:
                queue.append(m)

    if processed < len(leaf_tasks):
        return False

    return True

Task

Tasks are the atomic or composite units of work that make up a workflow. They contain status, dependencies, and assignment metadata.

Bases: BaseModel

A task in the workflow system.

Tasks represent atomic units of work that can be assigned to agents. They form nodes in the task dependency graph (G in the POSG state).

Source code in manager_agent_gym/schemas/core/tasks.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
class Task(BaseModel):
    """
    A task in the workflow system.

    Tasks represent atomic units of work that can be assigned to agents.
    They form nodes in the task dependency graph (G in the POSG state).
    """

    id: UUID = Field(
        default_factory=uuid4,
        description="Unique identifier for the task",
    )
    name: str = Field(
        ...,
        description="Clear, descriptive task name",
        examples=["Draft technical memo"],
    )
    description: str = Field(
        ...,
        description="Detailed task description and objectives",
        examples=["Write a 2-page memo for execs."],
    )

    # Hierarchical structure
    subtasks: list["Task"] = Field(
        default_factory=list,
        description="Subtasks that make up this task (recursive structure)",
    )
    parent_task_id: UUID | None = Field(
        default=None, description="ID of parent task if this is a subtask"
    )

    # Dependencies and resources
    input_resource_ids: list[UUID] = Field(
        default_factory=list, description="IDs of required input resources"
    )
    output_resource_ids: list[UUID] = Field(
        default_factory=list, description="IDs of produced output resources"
    )
    dependency_task_ids: list[UUID] = Field(
        default_factory=list, description="Tasks that must complete before this one"
    )

    # Task metadata
    status: TaskStatus = Field(
        default=TaskStatus.PENDING, description="Execution status of the task"
    )
    assigned_agent_id: str | None = Field(
        default=None, description="ID of the agent currently assigned to this task"
    )

    # Execution tracking
    execution_notes: list[str] = Field(
        default_factory=list,
        description="Free-form execution notes and manager instructions",
    )
    estimated_duration_hours: float | None = Field(
        default=None, description="Estimated duration in hours"
    )
    actual_duration_hours: float | None = Field(
        default=None, description="Actual duration in hours (reported by agent)"
    )
    estimated_cost: float | None = Field(
        default=None, description="Estimated cost in currency units"
    )
    actual_cost: float | None = Field(
        default=None, description="Actual cost in currency units (reported by agent)"
    )
    quality_score: float | None = Field(
        default=None, description="Quality assessment [0,1]"
    )

    # Timestamps
    started_at: datetime | None = Field(default=None)
    completed_at: datetime | None = Field(default=None)
    deps_ready_at: datetime | None = Field(
        default=None, description="When all dependencies became satisfied"
    )

    # Derived, reporting-only composite status for UX/manager (scheduler ignores this)
    effective_status: str | None = Field(
        default=None,
        description="Derived status for composites based on descendant leaves; for leaves equals status.",
    )

    @property
    def task_id(self) -> UUID:
        """Alias for id field to maintain compatibility."""
        return self.id

    def is_ready_to_start(self, completed_task_ids: set[UUID]) -> bool:
        """Check if all dependencies are satisfied."""
        return all(dep_id in completed_task_ids for dep_id in self.dependency_task_ids)

    def is_composite_task(self) -> bool:
        """Check if this task has subtasks (composite task)."""
        return len(self.subtasks) > 0

    def calculate_coordination_deadtime_seconds(self) -> float:
        """
        Calculate coordination deadtime for this task in seconds.

        Deadtime = max(0, start_time - deps_ready_time)
        Returns 0.0 if timestamps are not available.

        Returns:
            Coordination deadtime in seconds
        """
        if self.started_at is None or self.deps_ready_at is None:
            return 0.0

        deadtime_seconds = (self.started_at - self.deps_ready_at).total_seconds()
        return max(0.0, deadtime_seconds)

    def is_atomic_task(self) -> bool:
        """Check if this task has no subtasks (atomic task)."""
        return len(self.subtasks) == 0

    def get_all_subtasks_flat(self) -> list["Task"]:
        """Get all subtasks in a flat list (recursive)."""
        all_subtasks = []
        for subtask in self.subtasks:
            all_subtasks.append(subtask)
            all_subtasks.extend(subtask.get_all_subtasks_flat())
        return all_subtasks

    def get_atomic_subtasks(self) -> list["Task"]:
        """Get only the atomic (leaf) subtasks."""
        atomic_tasks = []
        for subtask in self.subtasks:
            if subtask.is_atomic_task():
                atomic_tasks.append(subtask)
            else:
                atomic_tasks.extend(subtask.get_atomic_subtasks())
        return atomic_tasks

    def add_subtask(self, subtask: "Task") -> None:
        """Add a subtask and set its parent reference."""
        subtask.parent_task_id = self.id
        self.subtasks.append(subtask)

    def remove_subtask(self, subtask_id: UUID) -> bool:
        """Remove a subtask by ID. Returns True if found and removed."""
        for i, subtask in enumerate(self.subtasks):
            if subtask.id == subtask_id:
                self.subtasks.pop(i)
                return True
            # Check recursively in subtasks
            if subtask.remove_subtask(subtask_id):
                return True
        return False

    def find_task_by_id(self, task_id: UUID) -> "Task | None":
        """Find a task by ID in this task tree."""
        if self.id == task_id:
            return self
        for subtask in self.subtasks:
            found = subtask.find_task_by_id(task_id)
            if found:
                return found
        return None

    def sync_embedded_tasks_with_registry(
        self, task_registry: dict[UUID, "Task"]
    ) -> None:
        """
        Synchronize embedded subtasks with the authoritative task registry.

        This fixes the bug where embedded subtasks become stale when the registry is updated.
        Recursively updates all embedded subtasks to match their registry counterparts.

        Args:
            task_registry: Dictionary mapping task IDs to authoritative Task objects
        """
        for i, subtask in enumerate(self.subtasks):
            if subtask.id in task_registry:
                # Replace embedded subtask with the authoritative version from registry
                registry_task = task_registry[subtask.id]
                self.subtasks[i] = registry_task

                # Recursively sync any nested subtasks
                registry_task.sync_embedded_tasks_with_registry(task_registry)
            else:
                # Subtask not in registry - still sync its nested children if any
                subtask.sync_embedded_tasks_with_registry(task_registry)

    def pretty_print(self, indent: int = 0) -> str:
        """Return a human-readable summary of the task and selected fields."""
        prefix = "  " * indent
        lines: list[str] = []
        lines.append(f"{prefix}• Task: {self.name} (ID: {self.id})")
        lines.append(f"{prefix}  Status: {self.status.value}")
        if self.description:
            lines.append(f"{prefix}  Description: {self.description}")
        if self.estimated_duration_hours is not None:
            lines.append(
                f"{prefix}  Estimated hours: {self.estimated_duration_hours:.2f}"
            )
        if self.actual_duration_hours is not None:
            lines.append(f"{prefix}  Actual hours: {self.actual_duration_hours:.2f}")
        if self.estimated_cost is not None:
            lines.append(f"{prefix}  Estimated cost: ${float(self.estimated_cost):.2f}")
        if self.actual_cost is not None:
            lines.append(f"{prefix}  Actual cost: ${float(self.actual_cost):.2f}")
        if self.dependency_task_ids:
            dep_ids = ", ".join(str(d) for d in self.dependency_task_ids)
            lines.append(f"{prefix}  Depends on: {dep_ids}")
        if self.input_resource_ids:
            lines.append(
                f"{prefix}  Input resources: {[str(r) for r in self.input_resource_ids]}"
            )
        if self.output_resource_ids:
            lines.append(
                f"{prefix}  Output resources: {[str(r) for r in self.output_resource_ids]}"
            )
        if self.subtasks:
            lines.append(f"{prefix}  Subtasks:")
            for st in self.subtasks:
                lines.append(st.pretty_print(indent + 2))
        return "\n".join(lines)

task_id: UUID property

Alias for id field to maintain compatibility.

add_subtask(subtask: Task) -> None

Add a subtask and set its parent reference.

Source code in manager_agent_gym/schemas/core/tasks.py
148
149
150
151
def add_subtask(self, subtask: "Task") -> None:
    """Add a subtask and set its parent reference."""
    subtask.parent_task_id = self.id
    self.subtasks.append(subtask)

calculate_coordination_deadtime_seconds() -> float

Calculate coordination deadtime for this task in seconds.

Deadtime = max(0, start_time - deps_ready_time) Returns 0.0 if timestamps are not available.

Returns:

Type Description
float

Coordination deadtime in seconds

Source code in manager_agent_gym/schemas/core/tasks.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def calculate_coordination_deadtime_seconds(self) -> float:
    """
    Calculate coordination deadtime for this task in seconds.

    Deadtime = max(0, start_time - deps_ready_time)
    Returns 0.0 if timestamps are not available.

    Returns:
        Coordination deadtime in seconds
    """
    if self.started_at is None or self.deps_ready_at is None:
        return 0.0

    deadtime_seconds = (self.started_at - self.deps_ready_at).total_seconds()
    return max(0.0, deadtime_seconds)

find_task_by_id(task_id: UUID) -> Task | None

Find a task by ID in this task tree.

Source code in manager_agent_gym/schemas/core/tasks.py
164
165
166
167
168
169
170
171
172
def find_task_by_id(self, task_id: UUID) -> "Task | None":
    """Find a task by ID in this task tree."""
    if self.id == task_id:
        return self
    for subtask in self.subtasks:
        found = subtask.find_task_by_id(task_id)
        if found:
            return found
    return None

get_all_subtasks_flat() -> list[Task]

Get all subtasks in a flat list (recursive).

Source code in manager_agent_gym/schemas/core/tasks.py
130
131
132
133
134
135
136
def get_all_subtasks_flat(self) -> list["Task"]:
    """Get all subtasks in a flat list (recursive)."""
    all_subtasks = []
    for subtask in self.subtasks:
        all_subtasks.append(subtask)
        all_subtasks.extend(subtask.get_all_subtasks_flat())
    return all_subtasks

get_atomic_subtasks() -> list[Task]

Get only the atomic (leaf) subtasks.

Source code in manager_agent_gym/schemas/core/tasks.py
138
139
140
141
142
143
144
145
146
def get_atomic_subtasks(self) -> list["Task"]:
    """Get only the atomic (leaf) subtasks."""
    atomic_tasks = []
    for subtask in self.subtasks:
        if subtask.is_atomic_task():
            atomic_tasks.append(subtask)
        else:
            atomic_tasks.extend(subtask.get_atomic_subtasks())
    return atomic_tasks

is_atomic_task() -> bool

Check if this task has no subtasks (atomic task).

Source code in manager_agent_gym/schemas/core/tasks.py
126
127
128
def is_atomic_task(self) -> bool:
    """Check if this task has no subtasks (atomic task)."""
    return len(self.subtasks) == 0

is_composite_task() -> bool

Check if this task has subtasks (composite task).

Source code in manager_agent_gym/schemas/core/tasks.py
106
107
108
def is_composite_task(self) -> bool:
    """Check if this task has subtasks (composite task)."""
    return len(self.subtasks) > 0

is_ready_to_start(completed_task_ids: set[UUID]) -> bool

Check if all dependencies are satisfied.

Source code in manager_agent_gym/schemas/core/tasks.py
102
103
104
def is_ready_to_start(self, completed_task_ids: set[UUID]) -> bool:
    """Check if all dependencies are satisfied."""
    return all(dep_id in completed_task_ids for dep_id in self.dependency_task_ids)

pretty_print(indent: int = 0) -> str

Return a human-readable summary of the task and selected fields.

Source code in manager_agent_gym/schemas/core/tasks.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def pretty_print(self, indent: int = 0) -> str:
    """Return a human-readable summary of the task and selected fields."""
    prefix = "  " * indent
    lines: list[str] = []
    lines.append(f"{prefix}• Task: {self.name} (ID: {self.id})")
    lines.append(f"{prefix}  Status: {self.status.value}")
    if self.description:
        lines.append(f"{prefix}  Description: {self.description}")
    if self.estimated_duration_hours is not None:
        lines.append(
            f"{prefix}  Estimated hours: {self.estimated_duration_hours:.2f}"
        )
    if self.actual_duration_hours is not None:
        lines.append(f"{prefix}  Actual hours: {self.actual_duration_hours:.2f}")
    if self.estimated_cost is not None:
        lines.append(f"{prefix}  Estimated cost: ${float(self.estimated_cost):.2f}")
    if self.actual_cost is not None:
        lines.append(f"{prefix}  Actual cost: ${float(self.actual_cost):.2f}")
    if self.dependency_task_ids:
        dep_ids = ", ".join(str(d) for d in self.dependency_task_ids)
        lines.append(f"{prefix}  Depends on: {dep_ids}")
    if self.input_resource_ids:
        lines.append(
            f"{prefix}  Input resources: {[str(r) for r in self.input_resource_ids]}"
        )
    if self.output_resource_ids:
        lines.append(
            f"{prefix}  Output resources: {[str(r) for r in self.output_resource_ids]}"
        )
    if self.subtasks:
        lines.append(f"{prefix}  Subtasks:")
        for st in self.subtasks:
            lines.append(st.pretty_print(indent + 2))
    return "\n".join(lines)

remove_subtask(subtask_id: UUID) -> bool

Remove a subtask by ID. Returns True if found and removed.

Source code in manager_agent_gym/schemas/core/tasks.py
153
154
155
156
157
158
159
160
161
162
def remove_subtask(self, subtask_id: UUID) -> bool:
    """Remove a subtask by ID. Returns True if found and removed."""
    for i, subtask in enumerate(self.subtasks):
        if subtask.id == subtask_id:
            self.subtasks.pop(i)
            return True
        # Check recursively in subtasks
        if subtask.remove_subtask(subtask_id):
            return True
    return False

sync_embedded_tasks_with_registry(task_registry: dict[UUID, Task]) -> None

Synchronize embedded subtasks with the authoritative task registry.

This fixes the bug where embedded subtasks become stale when the registry is updated. Recursively updates all embedded subtasks to match their registry counterparts.

Parameters:

Name Type Description Default
task_registry dict[UUID, Task]

Dictionary mapping task IDs to authoritative Task objects

required
Source code in manager_agent_gym/schemas/core/tasks.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def sync_embedded_tasks_with_registry(
    self, task_registry: dict[UUID, "Task"]
) -> None:
    """
    Synchronize embedded subtasks with the authoritative task registry.

    This fixes the bug where embedded subtasks become stale when the registry is updated.
    Recursively updates all embedded subtasks to match their registry counterparts.

    Args:
        task_registry: Dictionary mapping task IDs to authoritative Task objects
    """
    for i, subtask in enumerate(self.subtasks):
        if subtask.id in task_registry:
            # Replace embedded subtask with the authoritative version from registry
            registry_task = task_registry[subtask.id]
            self.subtasks[i] = registry_task

            # Recursively sync any nested subtasks
            registry_task.sync_embedded_tasks_with_registry(task_registry)
        else:
            # Subtask not in registry - still sync its nested children if any
            subtask.sync_embedded_tasks_with_registry(task_registry)

Manager Observation

ManagerObservation describes what a manager sees at each timestep, including available tasks, resources, and communications.

Bases: BaseModel

Observation provided to manager agent at each timestep.

Source code in manager_agent_gym/schemas/execution/manager.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class ManagerObservation(BaseModel):
    """Observation provided to manager agent at each timestep."""

    # Allow non-Pydantic types like AgentInterface in fields
    model_config = ConfigDict(arbitrary_types_allowed=True)
    workflow_summary: str
    timestep: int = Field(..., description="Current timestep number")
    workflow_id: UUID = Field(..., description="ID of the workflow being executed")
    execution_state: str = Field(..., description="Current execution state")
    task_status_counts: dict[str, int] = Field(
        default_factory=dict, description="Count of tasks by status"
    )
    ready_task_ids: list[UUID] = Field(
        default_factory=list, description="Tasks ready to start"
    )
    running_task_ids: list[UUID] = Field(
        default_factory=list, description="Currently running tasks"
    )
    completed_task_ids: list[UUID] = Field(
        default_factory=list, description="Completed task IDs"
    )
    failed_task_ids: list[UUID] = Field(
        default_factory=list, description="Failed task IDs"
    )
    available_agent_metadata: list[AgentConfig] = Field(
        default_factory=list, description="Available agent metadata"
    )
    recent_messages: list[Message] = Field(
        default_factory=list, description="Recent communications"
    )
    workflow_progress: float = Field(
        ..., ge=0.0, le=1.0, description="Completion percentage"
    )
    observation_timestamp: datetime = Field(default_factory=datetime.now)

    # Optional timeline awareness
    max_timesteps: int | None = Field(
        default=None, description="Configured maximum timesteps for this run"
    )
    timesteps_remaining: int | None = Field(
        default=None, description="Remaining timesteps before reaching the limit"
    )
    time_progress: float | None = Field(
        default=None,
        ge=0.0,
        le=1.0,
        description="Fraction of timestep budget consumed (0..1)",
    )

    # Constraints visibility
    constraints: list[Constraint] = Field(
        default_factory=list, description="Workflow constraints (hard/soft/etc.)"
    )

    # Dynamic ID universes for schema-constrained action generation
    # These allow the manager agents to constrain IDs to valid values at generation time
    task_ids: list[UUID] = Field(
        default_factory=list, description="All task IDs currently in the workflow"
    )
    resource_ids: list[UUID] = Field(
        default_factory=list, description="All resource IDs currently in the workflow"
    )
    agent_ids: list[str] = Field(
        default_factory=list, description="All agent IDs registered in the workflow"
    )

    stakeholder_profile: StakeholderPublicProfile = Field(
        description="Public stakeholder profile",
    )

Preferences

PreferenceWeights encodes stakeholder priorities across objectives such as quality, time, cost, and oversight.

Bases: BaseModel

A collection of multi-objective preference weights for workflow optimization. Weights are automatically normalized to sum to 1.0 upon initialization.

Source code in manager_agent_gym/schemas/preferences/preference.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class PreferenceWeights(BaseModel):
    """
    A collection of multi-objective preference weights for workflow optimization.
    Weights are automatically normalized to sum to 1.0 upon initialization.
    """

    preferences: List[Preference] = Field(
        default_factory=list, description="List of preference dimensions"
    )
    timestep: int = Field(
        default=0, description="Timestep at which these preferences apply"
    )

    @model_validator(mode="after")
    def normalize_weights(self) -> "PreferenceWeights":
        total_weight = sum(p.weight for p in self.preferences)
        if total_weight > 0:
            for p in self.preferences:
                p.weight = p.weight / total_weight
        elif self.preferences:
            equal_weight = 1.0 / len(self.preferences)
            for p in self.preferences:
                p.weight = equal_weight
        return self

    def get_preference_names(self) -> List[str]:
        """Get all preference dimension names."""
        return [pref.name for pref in self.preferences]

    def get_preference_dict(self) -> dict[str, float]:
        """Get preferences as a dictionary mapping name to normalized weight."""
        return {pref.name: pref.weight for pref in self.preferences}

    def normalize(self) -> "PreferenceWeights":
        """Return a new PreferenceWeights with normalized weights."""
        return PreferenceWeights(preferences=[p.model_copy() for p in self.preferences])

    def get_preference_summary(self) -> str:
        """Get a summary of the preferences."""
        return "\n".join([f"{pref.name}: {pref.weight}" for pref in self.preferences])

get_preference_dict() -> dict[str, float]

Get preferences as a dictionary mapping name to normalized weight.

Source code in manager_agent_gym/schemas/preferences/preference.py
61
62
63
def get_preference_dict(self) -> dict[str, float]:
    """Get preferences as a dictionary mapping name to normalized weight."""
    return {pref.name: pref.weight for pref in self.preferences}

get_preference_names() -> List[str]

Get all preference dimension names.

Source code in manager_agent_gym/schemas/preferences/preference.py
57
58
59
def get_preference_names(self) -> List[str]:
    """Get all preference dimension names."""
    return [pref.name for pref in self.preferences]

get_preference_summary() -> str

Get a summary of the preferences.

Source code in manager_agent_gym/schemas/preferences/preference.py
69
70
71
def get_preference_summary(self) -> str:
    """Get a summary of the preferences."""
    return "\n".join([f"{pref.name}: {pref.weight}" for pref in self.preferences])

normalize() -> PreferenceWeights

Return a new PreferenceWeights with normalized weights.

Source code in manager_agent_gym/schemas/preferences/preference.py
65
66
67
def normalize(self) -> "PreferenceWeights":
    """Return a new PreferenceWeights with normalized weights."""
    return PreferenceWeights(preferences=[p.model_copy() for p in self.preferences])

Manager Actions

ActionResult captures the outcome of manager-issued actions, including validation messages and state transitions.

Bases: BaseModel

Structured result returned by manager actions.

Example
ActionResult(
    action_type="assign_task",
    summary="Assigned T123 to ai_writer",
    kind="mutation",
    data={"task_id": "...", "agent_id": "ai_writer"},
    timestep=3,
    success=True,
)
Source code in manager_agent_gym/schemas/execution/manager_actions.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class ActionResult(BaseModel):
    """Structured result returned by manager actions.

    Example:
        ```python
        ActionResult(
            action_type="assign_task",
            summary="Assigned T123 to ai_writer",
            kind="mutation",
            data={"task_id": "...", "agent_id": "ai_writer"},
            timestep=3,
            success=True,
        )
        ```
    """

    action_type: Literal[
        "assign_task",
        "assign_all_pending_tasks",
        "create_task",
        "remove_task",
        "send_message",
        "noop",
        "get_workflow_status",
        "get_available_agents",
        "get_pending_tasks",
        "refine_task",
        "add_task_dependency",
        "remove_task_dependency",
        "failed_action",
        "inspect_task",
        "request_end_workflow",
        "decompose_task",
        "assign_tasks_to_agents",
    ] = Field(description="Type of action result")
    summary: str = Field(description="Short summary of what happened / info returned")
    kind: Literal[
        "mutation", "info", "noop", "message", "inspection", "failed_action", "unknown"
    ] = Field(description="Type of action result")
    data: dict[str, Any] = Field(
        description="Optional structured payload for follow-up use (empty if not applicable)",
    )
    timestep: int | None = Field(
        default=None, description="Timestep of the action, set by the engine"
    )
    success: bool = Field(
        default=True, description="Whether the action succeeded (set by execute)"
    )