Skip to content

Core Systems

Core Systems

The core namespace exposes the runtime building blocks that orchestrate workflows in Manager Agent Gym. Use the sections below to understand each component before diving into the full API reference generated by mkdocstrings.

Workflow Execution

WorkflowExecutionEngine advances the simulation timestep-by-timestep, manages task queues, and gathers metrics for analysis.

Timestep-based workflow execution engine.

Orchestrates agents and evaluations in a discrete-time loop. Tasks run concurrently, while a manager agent observes state and takes actions between timesteps. Produces rich artifacts (snapshots, metrics, logs) for analysis and benchmarking.

Parameters:

Name Type Description Default
workflow Workflow

The workflow graph (tasks, resources, constraints).

required
agent_registry AgentRegistry

Dynamic registry used to join/leave agents.

required
stakeholder_agent StakeholderBase

Stakeholder simulator providing preferences and messages over time.

required
manager_agent ManagerAgent

The decision-making manager agent.

required
seed int

Global deterministic seed to propagate to components.

required
evaluations list[Evaluator] | None

Optional workflow-level evaluators to run.

None
output_config OutputConfig | None

Output directories and filenames.

None
max_timesteps int

Maximum number of timesteps to execute.

50
enable_timestep_logging bool

Persist per-timestep snapshots and metrics.

True
enable_final_metrics_logging bool

Persist final metrics and summary.

True
communication_service CommunicationService | None

Message bus; a default service is created if not provided.

None
timestep_end_callbacks Sequence[Callable[[TimestepEndContext], Awaitable[None]]] | None

Optional hooks fired at the end of each timestep (failures logged and ignored).

None
log_preference_evaluation_progress bool

Show tqdm progress for preference evals.

True
max_concurrent_rubrics int

Concurrency limit for rubric evaluation.

100
reward_aggregator BaseRewardAggregator | None

Aggregator used by the evaluator.

None
reward_projection RewardProjection | None

Optional projection to scalar reward.

None

Attributes:

Name Type Description
current_timestep int

Zero-based timestep index.

execution_state ExecutionState

Current engine state.

timestep_results list[ExecutionResult]

Accumulated per-timestep outputs.

validation_engine ValidationEngine

Evaluator used to compute rewards.

communication_service CommunicationService

Message hub used by agents.

Example
engine = WorkflowExecutionEngine(
    workflow=my_workflow,
    agent_registry=registry,
    stakeholder_agent=stakeholder,
    manager_agent=manager,
    seed=42,
    evaluations=[...],
)
results = await engine.run_full_execution()
Source code in manager_agent_gym/core/execution/engine.py
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
class WorkflowExecutionEngine:
    """Timestep-based workflow execution engine.

    Orchestrates agents and evaluations in a discrete-time loop. Tasks run
    concurrently, while a manager agent observes state and takes actions
    between timesteps. Produces rich artifacts (snapshots, metrics, logs)
    for analysis and benchmarking.

    Args:
        workflow (Workflow): The workflow graph (tasks, resources, constraints).
        agent_registry (AgentRegistry): Dynamic registry used to join/leave agents.
        stakeholder_agent (StakeholderBase): Stakeholder simulator providing
            preferences and messages over time.
        manager_agent (ManagerAgent): The decision-making manager agent.
        seed (int): Global deterministic seed to propagate to components.
        evaluations (list[Evaluator] | None): Optional workflow-level evaluators to run.
        output_config (OutputConfig | None): Output directories and filenames.
        max_timesteps (int): Maximum number of timesteps to execute.
        enable_timestep_logging (bool): Persist per-timestep snapshots and metrics.
        enable_final_metrics_logging (bool): Persist final metrics and summary.
        communication_service (CommunicationService | None): Message bus; a default
            service is created if not provided.
        timestep_end_callbacks (Sequence[Callable[[TimestepEndContext], Awaitable[None]]] | None):
            Optional hooks fired at the end of each timestep (failures logged and ignored).
        log_preference_evaluation_progress (bool): Show tqdm progress for preference evals.
        max_concurrent_rubrics (int): Concurrency limit for rubric evaluation.
        reward_aggregator (BaseRewardAggregator | None): Aggregator used by the evaluator.
        reward_projection (RewardProjection | None): Optional projection to scalar reward.

    Attributes:
        current_timestep (int): Zero-based timestep index.
        execution_state (ExecutionState): Current engine state.
        timestep_results (list[ExecutionResult]): Accumulated per-timestep outputs.
        validation_engine (ValidationEngine): Evaluator used to compute rewards.
        communication_service (CommunicationService): Message hub used by agents.

    Example:
        ```python
        engine = WorkflowExecutionEngine(
            workflow=my_workflow,
            agent_registry=registry,
            stakeholder_agent=stakeholder,
            manager_agent=manager,
            seed=42,
            evaluations=[...],
        )
        results = await engine.run_full_execution()
        ```
    """

    def __init__(
        self,
        workflow: Workflow,
        agent_registry: AgentRegistry,
        stakeholder_agent: StakeholderBase,
        manager_agent: ManagerAgent,
        seed: int,
        evaluations: list[Evaluator] | None = None,
        output_config: OutputConfig | None = None,
        max_timesteps: int = 50,
        enable_timestep_logging: bool = True,
        enable_final_metrics_logging: bool = True,
        communication_service: CommunicationService | None = None,
        timestep_end_callbacks: Sequence[
            Callable[[TimestepEndContext], Awaitable[None]]
        ]
        | None = None,
        # Preference evaluation controls
        log_preference_evaluation_progress: bool = True,
        max_concurrent_rubrics: int = 100,
        reward_aggregator: BaseRewardAggregator[object] | None = None,
        reward_projection: RewardProjection[object] | None = None,
    ):
        self.workflow = workflow
        self.agent_registry = agent_registry
        self.evaluations = list(evaluations or [])
        self.manager_agent = manager_agent
        self.stakeholder_agent: StakeholderBase = stakeholder_agent
        self.workflow.add_agent(stakeholder_agent)
        # Global run seed (for deterministic behavior where supported)
        self.seed: int = seed

        self.output_config = output_config or OutputConfig()
        self.enable_timestep_logging = enable_timestep_logging
        self.enable_final_metrics_logging = enable_final_metrics_logging
        self._timestep_end_callbacks: list[
            Callable[[TimestepEndContext], Awaitable[None]]
        ] = list(timestep_end_callbacks or [])

        # Execution state
        self.current_timestep = 0
        self.execution_state = ExecutionState.INITIALIZED
        self.timestep_results: list[ExecutionResult] = []

        # Task execution tracking
        self.running_tasks: dict[UUID, asyncio.Task] = {}
        self.completed_task_ids: set[UUID] = set()
        self.failed_task_ids: set[UUID] = set()

        self.max_timesteps = max_timesteps
        self._task_group: TaskGroup | None = None

        self.validation_engine = ValidationEngine(
            max_concurrent_rubrics=max_concurrent_rubrics,
            log_preference_progress=log_preference_evaluation_progress,
            reward_aggregator=reward_aggregator,
            reward_projection=reward_projection,
            seed=self.seed,
        )

        self.communication_service = (
            communication_service
            if communication_service is not None
            else CommunicationService()
        )
        # Inject communication service and propagate seed to all agents
        self._inject_communication_service()
        if self.manager_agent is None:
            raise ValueError("Manager agent must be provided")

        self.manager_agent.configure_seed(self.seed)
        self.stakeholder_agent.configure_seed(self.seed)

        self.output_writer = WorkflowSerialiser(
            self.output_config,
            self.communication_service,
            self.workflow,
        )

        self.preference_history: list[tuple[int, PreferenceWeights]] = []
        self.recent_preference_change: PreferenceChange | None = None
        self.preference_change_total_count: int = 0

        # Track which agents in the workflow were mirrored from the registry.
        # This lets us safely prune only those, leaving user-added agents intact.
        self._registry_mirrored_agent_ids: set[str] = set()

        # Ensure output directories exist (only if logging is enabled)
        if self.enable_timestep_logging or self.enable_final_metrics_logging:
            self.output_writer.ensure_directories()

        # Evaluation cadence configuration (default: BOTH) using rubric enum
        self.evaluation_cadence: RunCondition = RunCondition.ON_COMPLETION

    def restore_from_snapshot(self, snapshot_dir: str, timestep: int) -> None:
        """
        Restore engine state from a previous simulation snapshot.

        This updates the existing engine with state from a snapshot without
        reconstructing the entire engine. The engine should already be
        constructed with fresh components (workflow, agents, etc.).

        Args:
            snapshot_dir: Path to simulation run directory containing timestep_data/
            timestep: Target timestep to restore from
        """
        # Create and configure state restorer
        restorer = WorkflowStateRestorer(snapshot_dir, timestep)
        restorer.load_snapshot_data()

        logger.info("Restoring workflow state from timestep %s", timestep)

        # Restore all state components
        restorer.restore_workflow_state(self.workflow)
        restorer.restore_stakeholder_preferences(self.stakeholder_agent)
        restorer.restore_communication_history(self.communication_service)
        restorer.restore_manager_action_buffer(self.manager_agent)
        restorer.restore_active_agents(self.agent_registry)

        # Set current timestep and execution state
        self.current_timestep = timestep
        self.execution_state = ExecutionState.RUNNING

    def _restore_workflow_state(self, workflow_snapshot: dict) -> None:
        """Update workflow task and resource states from snapshot."""
        # Update task states
        tasks_data = workflow_snapshot.get("tasks", {})
        for task_id_str, task_data in tasks_data.items():
            task_id = UUID(task_id_str)
            if task_id in self.workflow.tasks:
                task = self.workflow.tasks[task_id]
                # Update key state information
                task.status = TaskStatus(task_data["status"])
                task.assigned_agent_id = task_data.get("assigned_agent_id")
                task.actual_duration_hours = task_data.get("actual_duration_hours")
                task.actual_cost = task_data.get("actual_cost")
                task.quality_score = task_data.get("quality_score")
                if task_data.get("started_at"):
                    task.started_at = datetime.fromisoformat(task_data["started_at"])
                if task_data.get("completed_at"):
                    task.completed_at = datetime.fromisoformat(
                        task_data["completed_at"]
                    )
                task.execution_notes = task_data.get("execution_notes", [])

        # Synchronize embedded subtasks with the updated registry to fix status inconsistencies
        for task in self.workflow.tasks.values():
            task.sync_embedded_tasks_with_registry(self.workflow.tasks)

        # Update workflow-level state
        self.workflow.total_cost = workflow_snapshot.get("total_cost", 0.0)
        if workflow_snapshot.get("started_at"):
            self.workflow.started_at = datetime.fromisoformat(
                workflow_snapshot["started_at"]
            )
        if workflow_snapshot.get("completed_at"):
            self.workflow.completed_at = datetime.fromisoformat(
                workflow_snapshot["completed_at"]
            )
        self.workflow.is_active = workflow_snapshot.get("is_active", True)

    def _restore_stakeholder_preferences(self, prefs_data: dict) -> None:
        """Update stakeholder agent preferences to match snapshot."""
        self.stakeholder_agent.apply_preference_change(
            timestep=self.current_timestep,
            new_weights=PreferenceWeights(
                preferences=[
                    Preference(name=key, weight=value)
                    for key, value in prefs_data["weights"].items()
                ]
            ),
            change_event=None,
        )

    def _restore_communication_history(self, messages: list) -> None:
        """Restore communication message history from snapshot."""
        for msg_data in messages:
            message = Message(
                message_id=UUID(msg_data["message_id"]),
                sender_id=msg_data["sender_id"],
                receiver_id=msg_data.get("receiver_id"),
                recipients=msg_data.get("recipients", []),
                content=msg_data["content"],
                message_type=MessageType(msg_data["message_type"]),
                timestamp=datetime.fromisoformat(msg_data["timestamp"]),
                thread_id=UUID(msg_data["thread_id"])
                if msg_data.get("thread_id")
                else None,
                parent_message_id=UUID(msg_data["parent_message_id"])
                if msg_data.get("parent_message_id")
                else None,
                related_task_id=UUID(msg_data["related_task_id"])
                if msg_data.get("related_task_id")
                else None,
                priority=msg_data.get("priority", 1),
                read_by={
                    agent_id: datetime.fromisoformat(read_time)
                    for agent_id, read_time in msg_data.get("read_by", {}).items()
                },
                metadata=msg_data.get("metadata", {}),
            )

            # Add message to communication service
            self.communication_service.graph.add_message(message)

    def _get_preferences_from_stakeholder_agent(
        self, timestep: int
    ) -> PreferenceWeights:
        """Resolve stakeholder-owned preferences for the given timestep."""

        return self.stakeholder_agent.get_preferences_for_timestep(timestep)

    async def run_full_execution(
        self, save_outputs: bool = True
    ) -> list[ExecutionResult]:
        """
        Run the complete workflow execution until completion or failure.

        Returns:
            List of timestep results from the execution
        """
        self.manager_agent.set_max_timesteps(self.max_timesteps)
        self.execution_state = ExecutionState.RUNNING

        # Structured concurrency: all validation tasks run under a TaskGroup and
        # must complete before exiting this context
        async with TaskGroup() as tg:
            self._task_group = tg

            while (
                not self._is_terminal_state()
                and self.current_timestep < self.max_timesteps
            ):
                timestep_result = await self.execute_timestep()
                self.timestep_results.append(timestep_result)

                # Save workflow state after each timestep
                await self._save_workflow_state(timestep_result)

                # Check for completion
                if self.workflow.is_complete():
                    self.execution_state = ExecutionState.COMPLETED
                    break

                # Check for explicit end request from agents
                try:
                    if (
                        self.communication_service is not None
                        and self.communication_service.is_end_workflow_requested()
                    ):
                        self.execution_state = ExecutionState.CANCELLED
                        break
                except Exception:
                    logger.error("Error checking end-workflow request", exc_info=True)

            # Check if we hit the timestep limit without completing
            if (
                self.current_timestep >= self.max_timesteps
                and not self.workflow.is_complete()
            ):
                self.execution_state = ExecutionState.FAILED

        # Exiting TaskGroup ensures all scheduled validations completed
        self._task_group = None

        # Run final evaluation set
        communications_sender = (
            self.communication_service.get_messages_grouped_by_sender(
                sort_within_group="time",
                include_broadcasts=True,
            )
        )
        comms_by_sender: list[SenderMessagesView] = cast(
            list[SenderMessagesView], communications_sender
        )
        manager_actions = self.manager_agent.get_action_buffer()
        await self.validation_engine.evaluate_timestep(
            workflow=self.workflow,
            timestep=self.current_timestep,
            preferences=self._get_preferences_from_stakeholder_agent(
                self.current_timestep
            ),
            workflow_evaluators=self.evaluations,
            cadence=RunCondition.ON_COMPLETION,
            communications=comms_by_sender,
            manager_actions=manager_actions,
        )

        if save_outputs:
            self.serialise_workflow_states_and_metrics()

        return self.timestep_results

    async def execute_timestep(self) -> ExecutionResult:
        """
        Execute a single timestep of the workflow.

        Returns:
            ExecutionResult with details of what happened
        """
        if not self.manager_agent:
            raise ValueError("Manager agent not configured")

        start_time = datetime.now()
        timestep = self.current_timestep

        agent_coordination_changes = self._check_and_apply_agent_changes()

        manager_action = None
        if self.manager_agent:
            self.execution_state = ExecutionState.WAITING_FOR_MANAGER
            # Unified RL-style step: agent constructs observation internally
            done_flag = self._is_terminal_state() or self.workflow.is_complete()
            manager_action = await self.manager_agent.step(
                workflow=self.workflow,
                execution_state=self.execution_state,
                current_timestep=self.current_timestep,
                running_tasks=self.running_tasks,
                completed_task_ids=self.completed_task_ids,
                failed_task_ids=self.failed_task_ids,
                communication_service=self.communication_service,
                previous_reward=self.validation_engine.most_recent_reward,
                done=done_flag,
                stakeholder_profile=self.stakeholder_agent.public_profile,
            )
            try:
                action_result = await manager_action.execute(
                    self.workflow, self.communication_service
                )
            except Exception:
                logger.error("failed to execute manager action", exc_info=True)
                action_result = None

            # Delegate action logging to the manager agent hook
            self.manager_agent.on_action_executed(
                timestep=timestep,
                action=manager_action,
                action_result=action_result,
            )

        self.execution_state = ExecutionState.EXECUTING_TASKS
        tasks_started, tasks_completed, tasks_failed = await self._execute_ready_tasks()

        self._update_workflow_state(tasks_completed, tasks_failed)

        # Evaluate preferences for this timestep if configured
        did_eval_this_step = False
        if self.evaluation_cadence in (
            RunCondition.EACH_TIMESTEP,
            RunCondition.BOTH,
        ):
            communications_sender = (
                self.communication_service.get_messages_grouped_by_sender(
                    sort_within_group="time",
                    include_broadcasts=True,
                )
            )
            comms_by_sender: list[SenderMessagesView] = cast(
                list[SenderMessagesView], communications_sender
            )
            manager_actions = self.manager_agent.get_action_buffer()
            await self.validation_engine.evaluate_timestep(
                workflow=self.workflow,
                timestep=self.current_timestep,
                preferences=self._get_preferences_from_stakeholder_agent(
                    self.current_timestep
                ),
                workflow_evaluators=self.evaluations,
                cadence=RunCondition.EACH_TIMESTEP,
                communications=comms_by_sender,
                manager_actions=manager_actions,
            )
            did_eval_this_step = True

        # If cadence is not EACH_TIMESTEP/BOTH, still evaluate on selected timesteps only
        elif (
            self.validation_engine.selected_timesteps
            and self.current_timestep in self.validation_engine.selected_timesteps
        ):
            communications_sender = (
                self.communication_service.get_messages_grouped_by_sender(
                    sort_within_group="time",
                    include_broadcasts=True,
                )
            )
            comms_by_sender = cast(list[SenderMessagesView], communications_sender)
            manager_actions = self.manager_agent.get_action_buffer()
            await self.validation_engine.evaluate_timestep(
                workflow=self.workflow,
                timestep=self.current_timestep,
                preferences=self._get_preferences_from_stakeholder_agent(
                    self.current_timestep
                ),
                workflow_evaluators=self.evaluations,
                cadence=RunCondition.EACH_TIMESTEP,
                communications=comms_by_sender,
                manager_actions=manager_actions,
            )
            did_eval_this_step = True
        # Ensure reward vector has an entry for this timestep even if no evals were run
        if not did_eval_this_step:
            rv = self.validation_engine.reward_vector
            if len(rv) <= self.current_timestep:
                rv.extend([0.0] * (self.current_timestep + 1 - len(rv)))

        # Run stakeholder policy step
        await self.stakeholder_agent.policy_step(self.current_timestep)

        execution_time = (datetime.now() - start_time).total_seconds()

        # Capture stakeholder preference state for this timestep
        # Serialize a safe snapshot that avoids dumping callable fields inside evaluators
        current_weights = self._get_preferences_from_stakeholder_agent(
            self.current_timestep
        )
        safe_stakeholder_pref_state = {
            "timestep": self.current_timestep,
            "weights": current_weights.get_preference_dict(),
            "preference_names": current_weights.get_preference_names(),
            "change_event": (
                self.recent_preference_change.model_dump(mode="json")
                if self.recent_preference_change is not None
                else None
            ),
        }

        # Build observation for outputs/callbacks
        # Note: this is a duplicicative secdondary observation, which is not used by the manager agent directly
        # TODO: move this around so we expose the last observ
        observation = await self.manager_agent.create_observation(
            workflow=self.workflow,
            execution_state=self.execution_state,
            current_timestep=self.current_timestep,
            running_tasks=self.running_tasks,
            completed_task_ids=self.completed_task_ids,
            failed_task_ids=self.failed_task_ids,
            communication_service=self.communication_service,
            stakeholder_profile=self.stakeholder_agent.public_profile,
        )

        # Calculate total simulated time from completed tasks in this timestep
        total_simulated_hours = 0.0
        for task_id in tasks_completed:
            task = self.workflow.tasks.get(task_id)
            if task and task.actual_duration_hours is not None:
                total_simulated_hours += task.actual_duration_hours

        result = create_timestep_result(
            timestep=timestep,
            manager_id="workflow_engine",
            tasks_started=tasks_started,
            tasks_completed=tasks_completed,
            tasks_failed=tasks_failed,
            execution_time=execution_time,
            completed_tasks_simulated_hours=total_simulated_hours,
            manager_action=manager_action,
            manager_observation=observation,
            workflow_snapshot={
                **self.workflow.model_dump(
                    mode="json", exclude={"agents", "success_criteria"}
                ),
                "agents": self.output_writer._serialize_agents_for_snapshot(),
                "success_criteria": [],
            },
            preference_change_event=self.recent_preference_change,
            agent_coordination_changes=agent_coordination_changes,
            stakeholder_preference_state=safe_stakeholder_pref_state,
            # operational efficiency metrics are computed by evaluators
        )

        # Fire end-of-timestep callbacks with full context, without blocking engine on failures
        if self._timestep_end_callbacks:
            ctx = TimestepEndContext(
                timestep=timestep,
                execution_state=self.execution_state,
                workflow=self.workflow,
                manager_observation=observation,
                manager_action=manager_action,
                tasks_started=tasks_started,
                tasks_completed=tasks_completed,
                tasks_failed=tasks_failed,
                running_task_ids=list(self.running_tasks.keys()),
                completed_task_ids=list(self.completed_task_ids),
                failed_task_ids=list(self.failed_task_ids),
                preference_change_event=self.recent_preference_change,
                agent_coordination_changes=agent_coordination_changes,
                execution_time_seconds=execution_time,
                execution_result=result,
            )
            for cb in self._timestep_end_callbacks:
                try:
                    await cb(ctx)
                except Exception:
                    logger.error(
                        f"timestep_end callback failed: {traceback.format_exc()}"
                    )

        self.current_timestep += 1
        return result

    def _inject_communication_service(self) -> None:
        """Inject communication service into all agents in the workflow."""
        for agent in self.workflow.agents.values():
            # Simple: just set the communication service reference
            # Agents are responsible for using it properly in their tools
            if isinstance(agent, AgentInterface):
                agent.communication_service = self.communication_service
                agent.configure_seed(self.seed)

    async def _execute_ready_tasks(self) -> tuple[list[UUID], list[UUID], list[UUID]]:
        """
        Execute all tasks that are ready to start.

        Returns:
            Tuple of (tasks_started, tasks_completed, tasks_failed)
        """
        tasks_started = []
        tasks_completed = []
        tasks_failed = []

        if self.running_tasks:
            done_tasks, pending_tasks = await asyncio.wait(
                self.running_tasks.values(),
                return_when=asyncio.ALL_COMPLETED,
                timeout=300,
            )

            for done_task in done_tasks:
                task_id = None
                for tid, atask in self.running_tasks.items():
                    if atask == done_task:
                        task_id = tid
                        break

                if task_id:
                    try:
                        result = await done_task
                        if result.success:
                            tasks_completed.append(task_id)
                            self.completed_task_ids.add(task_id)

                            resource_ids = []
                            new_resources = []
                            for resource in result.output_resources:
                                self.workflow.add_resource(resource)
                                resource_ids.append(resource.id)
                                new_resources.append(resource)

                            # Validation system removed; skip resource validations

                            existing_task = self.workflow.tasks.get(task_id)
                            if existing_task is not None:
                                completed_task = existing_task.model_copy(
                                    update={
                                        "status": TaskStatus.COMPLETED,
                                        "completed_at": result.completed_at,
                                        "actual_duration_hours": float(
                                            result.simulated_duration_hours
                                        ),
                                        "actual_cost": result.actual_cost,
                                        "output_resource_ids": existing_task.output_resource_ids
                                        + resource_ids,
                                    }
                                )
                                self.workflow.tasks[task_id] = completed_task
                                # Synchronize embedded subtasks with updated registry to fix inconsistencies
                                for sync_task in self.workflow.tasks.values():
                                    sync_task.sync_embedded_tasks_with_registry(
                                        self.workflow.tasks
                                    )
                                self.workflow.total_simulated_hours += float(
                                    result.simulated_duration_hours
                                )
                                self.workflow.total_cost += float(result.actual_cost)

                            else:
                                logger.warning(
                                    f"Completed task {task_id} no longer exists in workflow (possibly removed). Skipping state update."
                                )

                        else:
                            tasks_failed.append(task_id)
                            self.failed_task_ids.add(task_id)

                            task = self.workflow.tasks.get(task_id)
                            if task is not None:
                                task.status = TaskStatus.FAILED
                                task.execution_notes.append(
                                    f"Failed: {result.error_message}"
                                )
                                # Synchronize embedded subtasks with updated registry to fix inconsistencies
                                for sync_task in self.workflow.tasks.values():
                                    sync_task.sync_embedded_tasks_with_registry(
                                        self.workflow.tasks
                                    )
                            else:
                                logger.warning(
                                    f"Failed task {task_id} no longer exists in workflow (possibly removed). Skipping state update."
                                )

                    except Exception as e:
                        logger.error(
                            f"Task {task_id} failed with exception: {traceback.format_exc()}"
                        )
                        tasks_failed.append(task_id)
                        self.failed_task_ids.add(task_id)

                        # Update task status if task still exists
                        task = self.workflow.tasks.get(task_id)
                        if task is not None:
                            task.status = TaskStatus.FAILED
                            task.execution_notes.append(f"Exception: {str(e)}")
                        else:
                            logger.warning(
                                f"Exception for task {task_id}, but task no longer exists in workflow. Skipping state update."
                            )

                    del self.running_tasks[task_id]

        # Start new tasks that are ready
        ready_tasks = self.workflow.get_ready_tasks()

        for task in ready_tasks:
            if (
                task.id not in self.running_tasks
                and task.id not in self.completed_task_ids
            ):
                # Mark when dependencies became ready (for coordination deadtime calculation)
                if task.deps_ready_at is None:
                    task.deps_ready_at = datetime.now()

                # Get assigned agent
                agent = None
                if task.assigned_agent_id:
                    agent = self.workflow.agents.get(task.assigned_agent_id)

                if agent:
                    # Get task resources
                    resources = self._get_task_resources(task)

                    # Start task execution
                    execution_task = asyncio.create_task(
                        agent.execute_task(task, resources)
                    )
                    self.running_tasks[task.id] = execution_task

                    # Update task status and timing
                    # READY -> RUNNING when the engine actually starts execution
                    task.status = TaskStatus.RUNNING
                    task.started_at = datetime.now()

                    tasks_started.append(task.id)

        return tasks_started, tasks_completed, tasks_failed

    def _get_task_resources(self, task: Task) -> list[Resource]:
        """
        Get input resources for a task.

        Args:
            task: The task to get resources for

        Returns:
            List of available input resources
        """
        # Gather input resources
        input_resources = []
        for resource_id in task.input_resource_ids:
            if resource_id in self.workflow.resources:
                input_resources.append(self.workflow.resources[resource_id])
        return input_resources

    def _update_workflow_state(
        self, completed_tasks: list[UUID], failed_tasks: list[UUID]
    ) -> None:
        """
        Update workflow state after task completions.

        Args:
            completed_tasks: List of completed task IDs
            failed_tasks: List of failed task IDs
        """
        # Update agent states
        for agent in self.workflow.agents.values():
            # Remove completed/failed tasks from current task list
            agent.current_task_ids = [
                tid
                for tid in agent.current_task_ids
                if tid not in completed_tasks and tid not in failed_tasks
            ]

            # Update performance metrics
            if any(tid in completed_tasks for tid in agent.current_task_ids):
                agent.tasks_completed += len(
                    [tid for tid in completed_tasks if tid in agent.current_task_ids]
                )

        # Update workflow timestamps
        if self.workflow.started_at is None and (completed_tasks or self.running_tasks):
            self.workflow.started_at = datetime.now()

        if self.workflow.is_complete() and self.workflow.completed_at is None:
            self.workflow.completed_at = datetime.now()

        # Propagate completion to all composite tasks whose atomic subtasks are all completed (recursive)
        try:
            status_updated = False

            def _update_composite_completion(node: Task) -> None:
                nonlocal status_updated
                if not node.is_atomic_task():
                    atomic_children = node.get_atomic_subtasks()

                    def _is_leaf_completed(leaf: Task) -> bool:
                        current = self.workflow.tasks.get(leaf.id)
                        return (
                            current and current.status == TaskStatus.COMPLETED
                        ) or leaf.status == TaskStatus.COMPLETED

                    if atomic_children and all(
                        _is_leaf_completed(leaf) for leaf in atomic_children
                    ):
                        if node.status != TaskStatus.COMPLETED:
                            node.status = TaskStatus.COMPLETED
                            node.completed_at = datetime.now()
                            status_updated = True
                    # Recurse into children for nested composites
                    for child in node.subtasks:
                        _update_composite_completion(child)

            for top in self.workflow.tasks.values():
                _update_composite_completion(top)

            # Synchronize embedded subtasks if any status was updated to fix inconsistencies
            if status_updated:
                for task in self.workflow.tasks.values():
                    task.sync_embedded_tasks_with_registry(self.workflow.tasks)

            # Normalize composite task states: composites should never be READY/RUNNING
            for task in self.workflow.tasks.values():
                try:
                    if not task.is_atomic_task() and task.status in (
                        TaskStatus.READY,
                        TaskStatus.RUNNING,
                    ):
                        task.status = TaskStatus.PENDING
                except Exception:
                    continue

            # Update derived effective_status for all tasks (including embedded composites)
            def _leaf_statuses(node: Task) -> list[TaskStatus]:
                if node.is_atomic_task():
                    reg = self.workflow.tasks.get(node.id)
                    return [reg.status if reg is not None else node.status]
                statuses: list[TaskStatus] = []
                for leaf in node.get_atomic_subtasks():
                    reg = self.workflow.tasks.get(leaf.id)
                    statuses.append(reg.status if reg is not None else leaf.status)
                return statuses

            def _set_effective_status_recursive(node: Task) -> None:
                try:
                    if node.is_atomic_task():
                        node.effective_status = node.status.value
                    else:
                        leaves = _leaf_statuses(node)
                        if leaves and all(s == TaskStatus.COMPLETED for s in leaves):
                            node.effective_status = TaskStatus.COMPLETED.value
                        elif any(s == TaskStatus.RUNNING for s in leaves):
                            node.effective_status = TaskStatus.RUNNING.value
                        elif any(s == TaskStatus.READY for s in leaves):
                            node.effective_status = TaskStatus.READY.value
                        else:
                            node.effective_status = TaskStatus.PENDING.value
                    # Recurse into children so embedded composites get their own value
                    for child in node.subtasks:
                        _set_effective_status_recursive(child)
                except Exception:
                    node.effective_status = node.status.value

            for root in self.workflow.tasks.values():
                _set_effective_status_recursive(root)
        except Exception:
            # Non-fatal; composite completion is a quality-of-life enhancement
            logger.error("Composite completion propagation failed", exc_info=True)

    def _is_terminal_state(self) -> bool:
        """Check if execution is in a terminal state."""
        return self.execution_state in [
            ExecutionState.COMPLETED,
            ExecutionState.FAILED,
            ExecutionState.CANCELLED,
        ]

    async def _save_workflow_state(self, timestep_result: ExecutionResult) -> None:
        """
        Save workflow state to disk.

        Args:
            timestep_result: The timestep result to save
        """
        if not self.enable_timestep_logging:
            return

        timestep = timestep_result.metadata.get("timestep", 0)
        filepath = self.output_config.get_timestep_file_path(timestep)

        # Ensure parent directory exists (robust against parallel runs)
        filepath.parent.mkdir(parents=True, exist_ok=True)

        # Delegate writing to OutputWriter
        try:
            self.output_writer.save_timestep(
                timestep_result=timestep_result,
                workflow=self.workflow,
                current_timestep=timestep,
                manager_agent=self.manager_agent,
                stakeholder_weights=self._get_preferences_from_stakeholder_agent(
                    timestep
                ),
            )
        except Exception:
            logger.error("output writer failed saving timestep", exc_info=True)

        metrics = {
            "execution_summary": {
                "total_timesteps": self.current_timestep,
                "total_tasks": len(self.workflow.tasks),
                "completed_tasks": len(self.completed_task_ids),
                "failed_tasks": len(self.failed_task_ids),
                "success_rate": len(self.completed_task_ids) / len(self.workflow.tasks)
                if self.workflow.tasks
                else 0.0,
                "execution_state": self.execution_state.value,
                "workflow_completed": self.workflow.is_complete(),
            },
            "timing": {
                "started_at": self.workflow.started_at.isoformat()
                if self.workflow.started_at
                else None,
                "completed_at": self.workflow.completed_at.isoformat()
                if self.workflow.completed_at
                else None,
                "total_execution_time_seconds": sum(
                    tr.execution_time_seconds for tr in self.timestep_results
                ),
            },
            "timestep_results": [
                tr.model_dump(mode="json") for tr in self.timestep_results
            ],
        }

        filepath = self.output_config.get_final_metrics_path()
        filepath.parent.mkdir(parents=True, exist_ok=True)
        with open(filepath, "w") as f:
            json.dump(metrics, f, indent=2, default=str)

    def serialise_workflow_states_and_metrics(self) -> None:
        """Write high-level execution logs (manager actions) into execution_logs directory."""
        try:
            briefs = self.manager_agent.get_action_buffer()
            converted: list[tuple[int, ActionResult | None]] = [
                (b.timestep, b) for b in briefs if b.timestep is not None
            ]
            self.output_writer.save_execution_logs(converted)

        except Exception:
            logger.error("output writer failed saving execution logs", exc_info=True)

        try:
            self.output_writer.save_evaluation_outputs(
                self.validation_engine.evaluation_results,
                reward_vector=self.validation_engine.reward_vector,
            )
        except Exception:
            logger.error(
                "output writer failed saving evaluation outputs", exc_info=True
            )
        try:
            self.output_writer.save_workflow_summary(
                workflow=self.workflow,
                completed_task_ids=self.completed_task_ids,
                failed_task_ids=self.failed_task_ids,
                current_timestep=self.current_timestep,
            )
        except Exception:
            logger.error("output writer failed saving workflow summary", exc_info=True)

    def get_current_workflow_state(self) -> Workflow:
        """
        Get a snapshot of the current workflow state for external evaluation.

        This enables decoupled evaluation where external evaluators can
        assess the current state without being tightly coupled to the engine.

        Returns:
            Current workflow state with all tasks, resources, and metadata
        """
        return self.workflow

    def get_current_execution_context(self) -> dict:
        """
        Get current execution context for evaluation.

        Returns execution metadata that evaluators might need.
        """
        return {
            "timestep": self.current_timestep,
            "execution_state": self.execution_state.value,
            "completed_task_ids": list(self.completed_task_ids),
            "failed_task_ids": list(self.failed_task_ids),
            "running_tasks": len(self.running_tasks),
        }

    def _check_and_apply_agent_changes(self) -> list[str]:
        """
        Check if agents should change and apply changes if needed.

        Returns:
            List of change descriptions for logging
        """
        changes: list[str] = []

        changes = self.agent_registry.apply_scheduled_changes_for_timestep(
            timestep=self.current_timestep,
            communication_service=self.communication_service,
            tool_factory=ToolFactory(),
        )

        # Mirror registry agents into workflow so observations/assignments can see them
        try:
            # 1) Add or update agents that exist in the registry
            current_registry_agents = {
                a.agent_id: a for a in self.agent_registry.list_agents()
            }
            for agent in current_registry_agents.values():
                self.workflow.add_agent(agent)
            # Update mirrored set to current registry snapshot
            current_registry_ids = set(current_registry_agents.keys())

            # 2) Prune only previously mirrored agents that are no longer in the registry
            #    Keep the stakeholder agent which is owned by the engine/workflow
            stakeholder_id = self.stakeholder_agent.agent_id
            previously_mirrored = set(self._registry_mirrored_agent_ids)
            to_remove = [
                agent_id
                for agent_id in previously_mirrored
                if agent_id != stakeholder_id
                and agent_id not in current_registry_ids
                and agent_id in self.workflow.agents
            ]
            for agent_id in to_remove:
                try:
                    del self.workflow.agents[agent_id]
                except Exception:
                    # Defensive: continue even if an entry cannot be deleted
                    pass

            # 3) Commit mirrored set to current for next timestep
            self._registry_mirrored_agent_ids = current_registry_ids
        except Exception:
            logger.error(
                "failed to sync agent registry into workflow agents", exc_info=True
            )

        return changes

_check_and_apply_agent_changes() -> list[str]

Check if agents should change and apply changes if needed.

Returns:

Type Description
list[str]

List of change descriptions for logging

Source code in manager_agent_gym/core/execution/engine.py
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
def _check_and_apply_agent_changes(self) -> list[str]:
    """
    Check if agents should change and apply changes if needed.

    Returns:
        List of change descriptions for logging
    """
    changes: list[str] = []

    changes = self.agent_registry.apply_scheduled_changes_for_timestep(
        timestep=self.current_timestep,
        communication_service=self.communication_service,
        tool_factory=ToolFactory(),
    )

    # Mirror registry agents into workflow so observations/assignments can see them
    try:
        # 1) Add or update agents that exist in the registry
        current_registry_agents = {
            a.agent_id: a for a in self.agent_registry.list_agents()
        }
        for agent in current_registry_agents.values():
            self.workflow.add_agent(agent)
        # Update mirrored set to current registry snapshot
        current_registry_ids = set(current_registry_agents.keys())

        # 2) Prune only previously mirrored agents that are no longer in the registry
        #    Keep the stakeholder agent which is owned by the engine/workflow
        stakeholder_id = self.stakeholder_agent.agent_id
        previously_mirrored = set(self._registry_mirrored_agent_ids)
        to_remove = [
            agent_id
            for agent_id in previously_mirrored
            if agent_id != stakeholder_id
            and agent_id not in current_registry_ids
            and agent_id in self.workflow.agents
        ]
        for agent_id in to_remove:
            try:
                del self.workflow.agents[agent_id]
            except Exception:
                # Defensive: continue even if an entry cannot be deleted
                pass

        # 3) Commit mirrored set to current for next timestep
        self._registry_mirrored_agent_ids = current_registry_ids
    except Exception:
        logger.error(
            "failed to sync agent registry into workflow agents", exc_info=True
        )

    return changes

_execute_ready_tasks() -> tuple[list[UUID], list[UUID], list[UUID]] async

Execute all tasks that are ready to start.

Returns:

Type Description
tuple[list[UUID], list[UUID], list[UUID]]

Tuple of (tasks_started, tasks_completed, tasks_failed)

Source code in manager_agent_gym/core/execution/engine.py
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
async def _execute_ready_tasks(self) -> tuple[list[UUID], list[UUID], list[UUID]]:
    """
    Execute all tasks that are ready to start.

    Returns:
        Tuple of (tasks_started, tasks_completed, tasks_failed)
    """
    tasks_started = []
    tasks_completed = []
    tasks_failed = []

    if self.running_tasks:
        done_tasks, pending_tasks = await asyncio.wait(
            self.running_tasks.values(),
            return_when=asyncio.ALL_COMPLETED,
            timeout=300,
        )

        for done_task in done_tasks:
            task_id = None
            for tid, atask in self.running_tasks.items():
                if atask == done_task:
                    task_id = tid
                    break

            if task_id:
                try:
                    result = await done_task
                    if result.success:
                        tasks_completed.append(task_id)
                        self.completed_task_ids.add(task_id)

                        resource_ids = []
                        new_resources = []
                        for resource in result.output_resources:
                            self.workflow.add_resource(resource)
                            resource_ids.append(resource.id)
                            new_resources.append(resource)

                        # Validation system removed; skip resource validations

                        existing_task = self.workflow.tasks.get(task_id)
                        if existing_task is not None:
                            completed_task = existing_task.model_copy(
                                update={
                                    "status": TaskStatus.COMPLETED,
                                    "completed_at": result.completed_at,
                                    "actual_duration_hours": float(
                                        result.simulated_duration_hours
                                    ),
                                    "actual_cost": result.actual_cost,
                                    "output_resource_ids": existing_task.output_resource_ids
                                    + resource_ids,
                                }
                            )
                            self.workflow.tasks[task_id] = completed_task
                            # Synchronize embedded subtasks with updated registry to fix inconsistencies
                            for sync_task in self.workflow.tasks.values():
                                sync_task.sync_embedded_tasks_with_registry(
                                    self.workflow.tasks
                                )
                            self.workflow.total_simulated_hours += float(
                                result.simulated_duration_hours
                            )
                            self.workflow.total_cost += float(result.actual_cost)

                        else:
                            logger.warning(
                                f"Completed task {task_id} no longer exists in workflow (possibly removed). Skipping state update."
                            )

                    else:
                        tasks_failed.append(task_id)
                        self.failed_task_ids.add(task_id)

                        task = self.workflow.tasks.get(task_id)
                        if task is not None:
                            task.status = TaskStatus.FAILED
                            task.execution_notes.append(
                                f"Failed: {result.error_message}"
                            )
                            # Synchronize embedded subtasks with updated registry to fix inconsistencies
                            for sync_task in self.workflow.tasks.values():
                                sync_task.sync_embedded_tasks_with_registry(
                                    self.workflow.tasks
                                )
                        else:
                            logger.warning(
                                f"Failed task {task_id} no longer exists in workflow (possibly removed). Skipping state update."
                            )

                except Exception as e:
                    logger.error(
                        f"Task {task_id} failed with exception: {traceback.format_exc()}"
                    )
                    tasks_failed.append(task_id)
                    self.failed_task_ids.add(task_id)

                    # Update task status if task still exists
                    task = self.workflow.tasks.get(task_id)
                    if task is not None:
                        task.status = TaskStatus.FAILED
                        task.execution_notes.append(f"Exception: {str(e)}")
                    else:
                        logger.warning(
                            f"Exception for task {task_id}, but task no longer exists in workflow. Skipping state update."
                        )

                del self.running_tasks[task_id]

    # Start new tasks that are ready
    ready_tasks = self.workflow.get_ready_tasks()

    for task in ready_tasks:
        if (
            task.id not in self.running_tasks
            and task.id not in self.completed_task_ids
        ):
            # Mark when dependencies became ready (for coordination deadtime calculation)
            if task.deps_ready_at is None:
                task.deps_ready_at = datetime.now()

            # Get assigned agent
            agent = None
            if task.assigned_agent_id:
                agent = self.workflow.agents.get(task.assigned_agent_id)

            if agent:
                # Get task resources
                resources = self._get_task_resources(task)

                # Start task execution
                execution_task = asyncio.create_task(
                    agent.execute_task(task, resources)
                )
                self.running_tasks[task.id] = execution_task

                # Update task status and timing
                # READY -> RUNNING when the engine actually starts execution
                task.status = TaskStatus.RUNNING
                task.started_at = datetime.now()

                tasks_started.append(task.id)

    return tasks_started, tasks_completed, tasks_failed

_get_preferences_from_stakeholder_agent(timestep: int) -> PreferenceWeights

Resolve stakeholder-owned preferences for the given timestep.

Source code in manager_agent_gym/core/execution/engine.py
307
308
309
310
311
312
def _get_preferences_from_stakeholder_agent(
    self, timestep: int
) -> PreferenceWeights:
    """Resolve stakeholder-owned preferences for the given timestep."""

    return self.stakeholder_agent.get_preferences_for_timestep(timestep)

_get_task_resources(task: Task) -> list[Resource]

Get input resources for a task.

Parameters:

Name Type Description Default
task Task

The task to get resources for

required

Returns:

Type Description
list[Resource]

List of available input resources

Source code in manager_agent_gym/core/execution/engine.py
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
def _get_task_resources(self, task: Task) -> list[Resource]:
    """
    Get input resources for a task.

    Args:
        task: The task to get resources for

    Returns:
        List of available input resources
    """
    # Gather input resources
    input_resources = []
    for resource_id in task.input_resource_ids:
        if resource_id in self.workflow.resources:
            input_resources.append(self.workflow.resources[resource_id])
    return input_resources

_inject_communication_service() -> None

Inject communication service into all agents in the workflow.

Source code in manager_agent_gym/core/execution/engine.py
602
603
604
605
606
607
608
609
def _inject_communication_service(self) -> None:
    """Inject communication service into all agents in the workflow."""
    for agent in self.workflow.agents.values():
        # Simple: just set the communication service reference
        # Agents are responsible for using it properly in their tools
        if isinstance(agent, AgentInterface):
            agent.communication_service = self.communication_service
            agent.configure_seed(self.seed)

_is_terminal_state() -> bool

Check if execution is in a terminal state.

Source code in manager_agent_gym/core/execution/engine.py
888
889
890
891
892
893
894
def _is_terminal_state(self) -> bool:
    """Check if execution is in a terminal state."""
    return self.execution_state in [
        ExecutionState.COMPLETED,
        ExecutionState.FAILED,
        ExecutionState.CANCELLED,
    ]

_restore_communication_history(messages: list) -> None

Restore communication message history from snapshot.

Source code in manager_agent_gym/core/execution/engine.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def _restore_communication_history(self, messages: list) -> None:
    """Restore communication message history from snapshot."""
    for msg_data in messages:
        message = Message(
            message_id=UUID(msg_data["message_id"]),
            sender_id=msg_data["sender_id"],
            receiver_id=msg_data.get("receiver_id"),
            recipients=msg_data.get("recipients", []),
            content=msg_data["content"],
            message_type=MessageType(msg_data["message_type"]),
            timestamp=datetime.fromisoformat(msg_data["timestamp"]),
            thread_id=UUID(msg_data["thread_id"])
            if msg_data.get("thread_id")
            else None,
            parent_message_id=UUID(msg_data["parent_message_id"])
            if msg_data.get("parent_message_id")
            else None,
            related_task_id=UUID(msg_data["related_task_id"])
            if msg_data.get("related_task_id")
            else None,
            priority=msg_data.get("priority", 1),
            read_by={
                agent_id: datetime.fromisoformat(read_time)
                for agent_id, read_time in msg_data.get("read_by", {}).items()
            },
            metadata=msg_data.get("metadata", {}),
        )

        # Add message to communication service
        self.communication_service.graph.add_message(message)

_restore_stakeholder_preferences(prefs_data: dict) -> None

Update stakeholder agent preferences to match snapshot.

Source code in manager_agent_gym/core/execution/engine.py
263
264
265
266
267
268
269
270
271
272
273
274
def _restore_stakeholder_preferences(self, prefs_data: dict) -> None:
    """Update stakeholder agent preferences to match snapshot."""
    self.stakeholder_agent.apply_preference_change(
        timestep=self.current_timestep,
        new_weights=PreferenceWeights(
            preferences=[
                Preference(name=key, weight=value)
                for key, value in prefs_data["weights"].items()
            ]
        ),
        change_event=None,
    )

_restore_workflow_state(workflow_snapshot: dict) -> None

Update workflow task and resource states from snapshot.

Source code in manager_agent_gym/core/execution/engine.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def _restore_workflow_state(self, workflow_snapshot: dict) -> None:
    """Update workflow task and resource states from snapshot."""
    # Update task states
    tasks_data = workflow_snapshot.get("tasks", {})
    for task_id_str, task_data in tasks_data.items():
        task_id = UUID(task_id_str)
        if task_id in self.workflow.tasks:
            task = self.workflow.tasks[task_id]
            # Update key state information
            task.status = TaskStatus(task_data["status"])
            task.assigned_agent_id = task_data.get("assigned_agent_id")
            task.actual_duration_hours = task_data.get("actual_duration_hours")
            task.actual_cost = task_data.get("actual_cost")
            task.quality_score = task_data.get("quality_score")
            if task_data.get("started_at"):
                task.started_at = datetime.fromisoformat(task_data["started_at"])
            if task_data.get("completed_at"):
                task.completed_at = datetime.fromisoformat(
                    task_data["completed_at"]
                )
            task.execution_notes = task_data.get("execution_notes", [])

    # Synchronize embedded subtasks with the updated registry to fix status inconsistencies
    for task in self.workflow.tasks.values():
        task.sync_embedded_tasks_with_registry(self.workflow.tasks)

    # Update workflow-level state
    self.workflow.total_cost = workflow_snapshot.get("total_cost", 0.0)
    if workflow_snapshot.get("started_at"):
        self.workflow.started_at = datetime.fromisoformat(
            workflow_snapshot["started_at"]
        )
    if workflow_snapshot.get("completed_at"):
        self.workflow.completed_at = datetime.fromisoformat(
            workflow_snapshot["completed_at"]
        )
    self.workflow.is_active = workflow_snapshot.get("is_active", True)

_save_workflow_state(timestep_result: ExecutionResult) -> None async

Save workflow state to disk.

Parameters:

Name Type Description Default
timestep_result ExecutionResult

The timestep result to save

required
Source code in manager_agent_gym/core/execution/engine.py
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
async def _save_workflow_state(self, timestep_result: ExecutionResult) -> None:
    """
    Save workflow state to disk.

    Args:
        timestep_result: The timestep result to save
    """
    if not self.enable_timestep_logging:
        return

    timestep = timestep_result.metadata.get("timestep", 0)
    filepath = self.output_config.get_timestep_file_path(timestep)

    # Ensure parent directory exists (robust against parallel runs)
    filepath.parent.mkdir(parents=True, exist_ok=True)

    # Delegate writing to OutputWriter
    try:
        self.output_writer.save_timestep(
            timestep_result=timestep_result,
            workflow=self.workflow,
            current_timestep=timestep,
            manager_agent=self.manager_agent,
            stakeholder_weights=self._get_preferences_from_stakeholder_agent(
                timestep
            ),
        )
    except Exception:
        logger.error("output writer failed saving timestep", exc_info=True)

    metrics = {
        "execution_summary": {
            "total_timesteps": self.current_timestep,
            "total_tasks": len(self.workflow.tasks),
            "completed_tasks": len(self.completed_task_ids),
            "failed_tasks": len(self.failed_task_ids),
            "success_rate": len(self.completed_task_ids) / len(self.workflow.tasks)
            if self.workflow.tasks
            else 0.0,
            "execution_state": self.execution_state.value,
            "workflow_completed": self.workflow.is_complete(),
        },
        "timing": {
            "started_at": self.workflow.started_at.isoformat()
            if self.workflow.started_at
            else None,
            "completed_at": self.workflow.completed_at.isoformat()
            if self.workflow.completed_at
            else None,
            "total_execution_time_seconds": sum(
                tr.execution_time_seconds for tr in self.timestep_results
            ),
        },
        "timestep_results": [
            tr.model_dump(mode="json") for tr in self.timestep_results
        ],
    }

    filepath = self.output_config.get_final_metrics_path()
    filepath.parent.mkdir(parents=True, exist_ok=True)
    with open(filepath, "w") as f:
        json.dump(metrics, f, indent=2, default=str)

_update_workflow_state(completed_tasks: list[UUID], failed_tasks: list[UUID]) -> None

Update workflow state after task completions.

Parameters:

Name Type Description Default
completed_tasks list[UUID]

List of completed task IDs

required
failed_tasks list[UUID]

List of failed task IDs

required
Source code in manager_agent_gym/core/execution/engine.py
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
def _update_workflow_state(
    self, completed_tasks: list[UUID], failed_tasks: list[UUID]
) -> None:
    """
    Update workflow state after task completions.

    Args:
        completed_tasks: List of completed task IDs
        failed_tasks: List of failed task IDs
    """
    # Update agent states
    for agent in self.workflow.agents.values():
        # Remove completed/failed tasks from current task list
        agent.current_task_ids = [
            tid
            for tid in agent.current_task_ids
            if tid not in completed_tasks and tid not in failed_tasks
        ]

        # Update performance metrics
        if any(tid in completed_tasks for tid in agent.current_task_ids):
            agent.tasks_completed += len(
                [tid for tid in completed_tasks if tid in agent.current_task_ids]
            )

    # Update workflow timestamps
    if self.workflow.started_at is None and (completed_tasks or self.running_tasks):
        self.workflow.started_at = datetime.now()

    if self.workflow.is_complete() and self.workflow.completed_at is None:
        self.workflow.completed_at = datetime.now()

    # Propagate completion to all composite tasks whose atomic subtasks are all completed (recursive)
    try:
        status_updated = False

        def _update_composite_completion(node: Task) -> None:
            nonlocal status_updated
            if not node.is_atomic_task():
                atomic_children = node.get_atomic_subtasks()

                def _is_leaf_completed(leaf: Task) -> bool:
                    current = self.workflow.tasks.get(leaf.id)
                    return (
                        current and current.status == TaskStatus.COMPLETED
                    ) or leaf.status == TaskStatus.COMPLETED

                if atomic_children and all(
                    _is_leaf_completed(leaf) for leaf in atomic_children
                ):
                    if node.status != TaskStatus.COMPLETED:
                        node.status = TaskStatus.COMPLETED
                        node.completed_at = datetime.now()
                        status_updated = True
                # Recurse into children for nested composites
                for child in node.subtasks:
                    _update_composite_completion(child)

        for top in self.workflow.tasks.values():
            _update_composite_completion(top)

        # Synchronize embedded subtasks if any status was updated to fix inconsistencies
        if status_updated:
            for task in self.workflow.tasks.values():
                task.sync_embedded_tasks_with_registry(self.workflow.tasks)

        # Normalize composite task states: composites should never be READY/RUNNING
        for task in self.workflow.tasks.values():
            try:
                if not task.is_atomic_task() and task.status in (
                    TaskStatus.READY,
                    TaskStatus.RUNNING,
                ):
                    task.status = TaskStatus.PENDING
            except Exception:
                continue

        # Update derived effective_status for all tasks (including embedded composites)
        def _leaf_statuses(node: Task) -> list[TaskStatus]:
            if node.is_atomic_task():
                reg = self.workflow.tasks.get(node.id)
                return [reg.status if reg is not None else node.status]
            statuses: list[TaskStatus] = []
            for leaf in node.get_atomic_subtasks():
                reg = self.workflow.tasks.get(leaf.id)
                statuses.append(reg.status if reg is not None else leaf.status)
            return statuses

        def _set_effective_status_recursive(node: Task) -> None:
            try:
                if node.is_atomic_task():
                    node.effective_status = node.status.value
                else:
                    leaves = _leaf_statuses(node)
                    if leaves and all(s == TaskStatus.COMPLETED for s in leaves):
                        node.effective_status = TaskStatus.COMPLETED.value
                    elif any(s == TaskStatus.RUNNING for s in leaves):
                        node.effective_status = TaskStatus.RUNNING.value
                    elif any(s == TaskStatus.READY for s in leaves):
                        node.effective_status = TaskStatus.READY.value
                    else:
                        node.effective_status = TaskStatus.PENDING.value
                # Recurse into children so embedded composites get their own value
                for child in node.subtasks:
                    _set_effective_status_recursive(child)
            except Exception:
                node.effective_status = node.status.value

        for root in self.workflow.tasks.values():
            _set_effective_status_recursive(root)
    except Exception:
        # Non-fatal; composite completion is a quality-of-life enhancement
        logger.error("Composite completion propagation failed", exc_info=True)

execute_timestep() -> ExecutionResult async

Execute a single timestep of the workflow.

Returns:

Type Description
ExecutionResult

ExecutionResult with details of what happened

Source code in manager_agent_gym/core/execution/engine.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
async def execute_timestep(self) -> ExecutionResult:
    """
    Execute a single timestep of the workflow.

    Returns:
        ExecutionResult with details of what happened
    """
    if not self.manager_agent:
        raise ValueError("Manager agent not configured")

    start_time = datetime.now()
    timestep = self.current_timestep

    agent_coordination_changes = self._check_and_apply_agent_changes()

    manager_action = None
    if self.manager_agent:
        self.execution_state = ExecutionState.WAITING_FOR_MANAGER
        # Unified RL-style step: agent constructs observation internally
        done_flag = self._is_terminal_state() or self.workflow.is_complete()
        manager_action = await self.manager_agent.step(
            workflow=self.workflow,
            execution_state=self.execution_state,
            current_timestep=self.current_timestep,
            running_tasks=self.running_tasks,
            completed_task_ids=self.completed_task_ids,
            failed_task_ids=self.failed_task_ids,
            communication_service=self.communication_service,
            previous_reward=self.validation_engine.most_recent_reward,
            done=done_flag,
            stakeholder_profile=self.stakeholder_agent.public_profile,
        )
        try:
            action_result = await manager_action.execute(
                self.workflow, self.communication_service
            )
        except Exception:
            logger.error("failed to execute manager action", exc_info=True)
            action_result = None

        # Delegate action logging to the manager agent hook
        self.manager_agent.on_action_executed(
            timestep=timestep,
            action=manager_action,
            action_result=action_result,
        )

    self.execution_state = ExecutionState.EXECUTING_TASKS
    tasks_started, tasks_completed, tasks_failed = await self._execute_ready_tasks()

    self._update_workflow_state(tasks_completed, tasks_failed)

    # Evaluate preferences for this timestep if configured
    did_eval_this_step = False
    if self.evaluation_cadence in (
        RunCondition.EACH_TIMESTEP,
        RunCondition.BOTH,
    ):
        communications_sender = (
            self.communication_service.get_messages_grouped_by_sender(
                sort_within_group="time",
                include_broadcasts=True,
            )
        )
        comms_by_sender: list[SenderMessagesView] = cast(
            list[SenderMessagesView], communications_sender
        )
        manager_actions = self.manager_agent.get_action_buffer()
        await self.validation_engine.evaluate_timestep(
            workflow=self.workflow,
            timestep=self.current_timestep,
            preferences=self._get_preferences_from_stakeholder_agent(
                self.current_timestep
            ),
            workflow_evaluators=self.evaluations,
            cadence=RunCondition.EACH_TIMESTEP,
            communications=comms_by_sender,
            manager_actions=manager_actions,
        )
        did_eval_this_step = True

    # If cadence is not EACH_TIMESTEP/BOTH, still evaluate on selected timesteps only
    elif (
        self.validation_engine.selected_timesteps
        and self.current_timestep in self.validation_engine.selected_timesteps
    ):
        communications_sender = (
            self.communication_service.get_messages_grouped_by_sender(
                sort_within_group="time",
                include_broadcasts=True,
            )
        )
        comms_by_sender = cast(list[SenderMessagesView], communications_sender)
        manager_actions = self.manager_agent.get_action_buffer()
        await self.validation_engine.evaluate_timestep(
            workflow=self.workflow,
            timestep=self.current_timestep,
            preferences=self._get_preferences_from_stakeholder_agent(
                self.current_timestep
            ),
            workflow_evaluators=self.evaluations,
            cadence=RunCondition.EACH_TIMESTEP,
            communications=comms_by_sender,
            manager_actions=manager_actions,
        )
        did_eval_this_step = True
    # Ensure reward vector has an entry for this timestep even if no evals were run
    if not did_eval_this_step:
        rv = self.validation_engine.reward_vector
        if len(rv) <= self.current_timestep:
            rv.extend([0.0] * (self.current_timestep + 1 - len(rv)))

    # Run stakeholder policy step
    await self.stakeholder_agent.policy_step(self.current_timestep)

    execution_time = (datetime.now() - start_time).total_seconds()

    # Capture stakeholder preference state for this timestep
    # Serialize a safe snapshot that avoids dumping callable fields inside evaluators
    current_weights = self._get_preferences_from_stakeholder_agent(
        self.current_timestep
    )
    safe_stakeholder_pref_state = {
        "timestep": self.current_timestep,
        "weights": current_weights.get_preference_dict(),
        "preference_names": current_weights.get_preference_names(),
        "change_event": (
            self.recent_preference_change.model_dump(mode="json")
            if self.recent_preference_change is not None
            else None
        ),
    }

    # Build observation for outputs/callbacks
    # Note: this is a duplicicative secdondary observation, which is not used by the manager agent directly
    # TODO: move this around so we expose the last observ
    observation = await self.manager_agent.create_observation(
        workflow=self.workflow,
        execution_state=self.execution_state,
        current_timestep=self.current_timestep,
        running_tasks=self.running_tasks,
        completed_task_ids=self.completed_task_ids,
        failed_task_ids=self.failed_task_ids,
        communication_service=self.communication_service,
        stakeholder_profile=self.stakeholder_agent.public_profile,
    )

    # Calculate total simulated time from completed tasks in this timestep
    total_simulated_hours = 0.0
    for task_id in tasks_completed:
        task = self.workflow.tasks.get(task_id)
        if task and task.actual_duration_hours is not None:
            total_simulated_hours += task.actual_duration_hours

    result = create_timestep_result(
        timestep=timestep,
        manager_id="workflow_engine",
        tasks_started=tasks_started,
        tasks_completed=tasks_completed,
        tasks_failed=tasks_failed,
        execution_time=execution_time,
        completed_tasks_simulated_hours=total_simulated_hours,
        manager_action=manager_action,
        manager_observation=observation,
        workflow_snapshot={
            **self.workflow.model_dump(
                mode="json", exclude={"agents", "success_criteria"}
            ),
            "agents": self.output_writer._serialize_agents_for_snapshot(),
            "success_criteria": [],
        },
        preference_change_event=self.recent_preference_change,
        agent_coordination_changes=agent_coordination_changes,
        stakeholder_preference_state=safe_stakeholder_pref_state,
        # operational efficiency metrics are computed by evaluators
    )

    # Fire end-of-timestep callbacks with full context, without blocking engine on failures
    if self._timestep_end_callbacks:
        ctx = TimestepEndContext(
            timestep=timestep,
            execution_state=self.execution_state,
            workflow=self.workflow,
            manager_observation=observation,
            manager_action=manager_action,
            tasks_started=tasks_started,
            tasks_completed=tasks_completed,
            tasks_failed=tasks_failed,
            running_task_ids=list(self.running_tasks.keys()),
            completed_task_ids=list(self.completed_task_ids),
            failed_task_ids=list(self.failed_task_ids),
            preference_change_event=self.recent_preference_change,
            agent_coordination_changes=agent_coordination_changes,
            execution_time_seconds=execution_time,
            execution_result=result,
        )
        for cb in self._timestep_end_callbacks:
            try:
                await cb(ctx)
            except Exception:
                logger.error(
                    f"timestep_end callback failed: {traceback.format_exc()}"
                )

    self.current_timestep += 1
    return result

get_current_execution_context() -> dict

Get current execution context for evaluation.

Returns execution metadata that evaluators might need.

Source code in manager_agent_gym/core/execution/engine.py
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
def get_current_execution_context(self) -> dict:
    """
    Get current execution context for evaluation.

    Returns execution metadata that evaluators might need.
    """
    return {
        "timestep": self.current_timestep,
        "execution_state": self.execution_state.value,
        "completed_task_ids": list(self.completed_task_ids),
        "failed_task_ids": list(self.failed_task_ids),
        "running_tasks": len(self.running_tasks),
    }

get_current_workflow_state() -> Workflow

Get a snapshot of the current workflow state for external evaluation.

This enables decoupled evaluation where external evaluators can assess the current state without being tightly coupled to the engine.

Returns:

Type Description
Workflow

Current workflow state with all tasks, resources, and metadata

Source code in manager_agent_gym/core/execution/engine.py
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
def get_current_workflow_state(self) -> Workflow:
    """
    Get a snapshot of the current workflow state for external evaluation.

    This enables decoupled evaluation where external evaluators can
    assess the current state without being tightly coupled to the engine.

    Returns:
        Current workflow state with all tasks, resources, and metadata
    """
    return self.workflow

restore_from_snapshot(snapshot_dir: str, timestep: int) -> None

Restore engine state from a previous simulation snapshot.

This updates the existing engine with state from a snapshot without reconstructing the entire engine. The engine should already be constructed with fresh components (workflow, agents, etc.).

Parameters:

Name Type Description Default
snapshot_dir str

Path to simulation run directory containing timestep_data/

required
timestep int

Target timestep to restore from

required
Source code in manager_agent_gym/core/execution/engine.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def restore_from_snapshot(self, snapshot_dir: str, timestep: int) -> None:
    """
    Restore engine state from a previous simulation snapshot.

    This updates the existing engine with state from a snapshot without
    reconstructing the entire engine. The engine should already be
    constructed with fresh components (workflow, agents, etc.).

    Args:
        snapshot_dir: Path to simulation run directory containing timestep_data/
        timestep: Target timestep to restore from
    """
    # Create and configure state restorer
    restorer = WorkflowStateRestorer(snapshot_dir, timestep)
    restorer.load_snapshot_data()

    logger.info("Restoring workflow state from timestep %s", timestep)

    # Restore all state components
    restorer.restore_workflow_state(self.workflow)
    restorer.restore_stakeholder_preferences(self.stakeholder_agent)
    restorer.restore_communication_history(self.communication_service)
    restorer.restore_manager_action_buffer(self.manager_agent)
    restorer.restore_active_agents(self.agent_registry)

    # Set current timestep and execution state
    self.current_timestep = timestep
    self.execution_state = ExecutionState.RUNNING

run_full_execution(save_outputs: bool = True) -> list[ExecutionResult] async

Run the complete workflow execution until completion or failure.

Returns:

Type Description
list[ExecutionResult]

List of timestep results from the execution

Source code in manager_agent_gym/core/execution/engine.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
async def run_full_execution(
    self, save_outputs: bool = True
) -> list[ExecutionResult]:
    """
    Run the complete workflow execution until completion or failure.

    Returns:
        List of timestep results from the execution
    """
    self.manager_agent.set_max_timesteps(self.max_timesteps)
    self.execution_state = ExecutionState.RUNNING

    # Structured concurrency: all validation tasks run under a TaskGroup and
    # must complete before exiting this context
    async with TaskGroup() as tg:
        self._task_group = tg

        while (
            not self._is_terminal_state()
            and self.current_timestep < self.max_timesteps
        ):
            timestep_result = await self.execute_timestep()
            self.timestep_results.append(timestep_result)

            # Save workflow state after each timestep
            await self._save_workflow_state(timestep_result)

            # Check for completion
            if self.workflow.is_complete():
                self.execution_state = ExecutionState.COMPLETED
                break

            # Check for explicit end request from agents
            try:
                if (
                    self.communication_service is not None
                    and self.communication_service.is_end_workflow_requested()
                ):
                    self.execution_state = ExecutionState.CANCELLED
                    break
            except Exception:
                logger.error("Error checking end-workflow request", exc_info=True)

        # Check if we hit the timestep limit without completing
        if (
            self.current_timestep >= self.max_timesteps
            and not self.workflow.is_complete()
        ):
            self.execution_state = ExecutionState.FAILED

    # Exiting TaskGroup ensures all scheduled validations completed
    self._task_group = None

    # Run final evaluation set
    communications_sender = (
        self.communication_service.get_messages_grouped_by_sender(
            sort_within_group="time",
            include_broadcasts=True,
        )
    )
    comms_by_sender: list[SenderMessagesView] = cast(
        list[SenderMessagesView], communications_sender
    )
    manager_actions = self.manager_agent.get_action_buffer()
    await self.validation_engine.evaluate_timestep(
        workflow=self.workflow,
        timestep=self.current_timestep,
        preferences=self._get_preferences_from_stakeholder_agent(
            self.current_timestep
        ),
        workflow_evaluators=self.evaluations,
        cadence=RunCondition.ON_COMPLETION,
        communications=comms_by_sender,
        manager_actions=manager_actions,
    )

    if save_outputs:
        self.serialise_workflow_states_and_metrics()

    return self.timestep_results

serialise_workflow_states_and_metrics() -> None

Write high-level execution logs (manager actions) into execution_logs directory.

Source code in manager_agent_gym/core/execution/engine.py
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
def serialise_workflow_states_and_metrics(self) -> None:
    """Write high-level execution logs (manager actions) into execution_logs directory."""
    try:
        briefs = self.manager_agent.get_action_buffer()
        converted: list[tuple[int, ActionResult | None]] = [
            (b.timestep, b) for b in briefs if b.timestep is not None
        ]
        self.output_writer.save_execution_logs(converted)

    except Exception:
        logger.error("output writer failed saving execution logs", exc_info=True)

    try:
        self.output_writer.save_evaluation_outputs(
            self.validation_engine.evaluation_results,
            reward_vector=self.validation_engine.reward_vector,
        )
    except Exception:
        logger.error(
            "output writer failed saving evaluation outputs", exc_info=True
        )
    try:
        self.output_writer.save_workflow_summary(
            workflow=self.workflow,
            completed_task_ids=self.completed_task_ids,
            failed_task_ids=self.failed_task_ids,
            current_timestep=self.current_timestep,
        )
    except Exception:
        logger.error("output writer failed saving workflow summary", exc_info=True)

Manager Agents

ManagerAgent is the abstract contract for decision-making managers. Implement custom manager strategies by inheriting from this base class.

Bases: ABC

Abstract interface for manager agents.

Implementations observe the workflow, choose an action each timestep, and maintain a compact action history for downstream evaluation.

Parameters:

Name Type Description Default
agent_id str

Unique identifier for the manager agent.

required
preferences PreferenceWeights

Initial normalized preference weights.

required

Attributes:

Name Type Description
agent_id str

Identifier for logging and communications.

preferences PreferenceWeights

Current preference weights.

_action_buffer deque[ActionResult]

Recent actions (maxlen 50).

Example
class MyManager(ManagerAgent):
    async def step(self, workflow, execution_state, stakeholder_profile,
                   current_timestep, running_tasks, completed_task_ids,
                   failed_task_ids, communication_service=None,
                   previous_reward=0.0, done=False) -> BaseManagerAction:
        # decide an action...
        return NoOpAction(reasoning="Observing")
Source code in manager_agent_gym/core/manager_agent/interface.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
class ManagerAgent(ABC):
    """Abstract interface for manager agents.

    Implementations observe the workflow, choose an action each timestep,
    and maintain a compact action history for downstream evaluation.

    Args:
        agent_id (str): Unique identifier for the manager agent.
        preferences (PreferenceWeights): Initial normalized preference weights.

    Attributes:
        agent_id (str): Identifier for logging and communications.
        preferences (PreferenceWeights): Current preference weights.
        _action_buffer (deque[ActionResult]): Recent actions (maxlen 50).

    Example:
        ```python
        class MyManager(ManagerAgent):
            async def step(self, workflow, execution_state, stakeholder_profile,
                           current_timestep, running_tasks, completed_task_ids,
                           failed_task_ids, communication_service=None,
                           previous_reward=0.0, done=False) -> BaseManagerAction:
                # decide an action...
                return NoOpAction(reasoning="Observing")
        ```
    """

    def __init__(self, agent_id: str, preferences: PreferenceWeights):
        self.agent_id = agent_id
        self.preferences = preferences
        self._action_buffer: deque[ActionResult] = deque(maxlen=50)
        # Execution horizon awareness (optional; set by engine)
        self._max_timesteps: int | None = None
        # Seed configured by engine (if any)
        self._seed: int = 42

    def configure_seed(self, seed: int) -> None:
        """Configure deterministic seed for this manager (overridable)."""
        self._seed = seed

    def record_action(self, brief: ActionResult) -> None:
        self._action_buffer.append(brief)

    def get_action_buffer(
        self, number_most_recent_actions: int | None = None
    ) -> list[ActionResult]:
        if number_most_recent_actions is None or number_most_recent_actions <= 0:
            return list(self._action_buffer)

        return list(self._action_buffer)[-number_most_recent_actions:]

    def set_max_timesteps(self, max_timesteps: int | None) -> None:
        """Set the maximum timesteps for the current execution (set by engine)."""
        self._max_timesteps = (
            max_timesteps if (max_timesteps is None or max_timesteps >= 0) else None
        )

    async def create_observation(
        self,
        workflow: "Workflow",
        execution_state: "ExecutionState",
        stakeholder_profile: StakeholderPublicProfile,
        current_timestep: int,
        running_tasks: dict,
        completed_task_ids: set,
        failed_task_ids: set,
        communication_service: "CommunicationService | None" = None,
    ) -> ManagerObservation:
        """
        Create manager observation from workflow state.

        Subclasses can override this to customize what they observe.

        Args:
            workflow: Current workflow state
            execution_state: Current execution state
            current_timestep: Current timestep number
            running_tasks: Currently executing tasks
            completed_task_ids: Set of completed task IDs
            failed_task_ids: Set of failed task IDs
            communication_service: Optional communication service for messages

        Returns:
            ManagerObservation with workflow state data
        """
        # Get task status summary
        task_statuses = {}
        for status in TaskStatus:
            task_statuses[status.value] = sum(
                1 for task in workflow.tasks.values() if task.status == status
            )

        # Get ready tasks
        ready_tasks = workflow.get_ready_tasks()

        # Get available agents
        available_agents = workflow.get_available_agents()

        # Get recent messages from communication service if available
        recent_messages = []
        if communication_service:
            all_comm_messages = communication_service.get_all_messages()
            recent_messages = all_comm_messages[:10]  # Last 10 messages
        else:
            # Fallback to workflow messages for backward compatibility
            recent_messages = workflow.messages[-5:]  # Last 5 messages

        # Compute timeline awareness fields if configured
        max_ts = self._max_timesteps
        ts_remaining = None
        time_progress = None
        if isinstance(max_ts, int) and max_ts > 0:
            ts_remaining = max(0, max_ts - current_timestep - 1)
            # Clamp progress in [0,1]
            time_progress = min(1.0, max(0.0, float(current_timestep) / float(max_ts)))

        return ManagerObservation(
            workflow_summary=workflow.pretty_print(),
            timestep=current_timestep,
            workflow_id=workflow.id,
            execution_state=execution_state,
            task_status_counts=task_statuses,
            ready_task_ids=[task.id for task in ready_tasks],
            running_task_ids=list(running_tasks.keys()),
            completed_task_ids=list(completed_task_ids),
            failed_task_ids=list(failed_task_ids),
            available_agent_metadata=[agent.config for agent in available_agents],
            recent_messages=recent_messages,
            workflow_progress=len(completed_task_ids) / len(workflow.tasks)
            if workflow.tasks
            else 0.0,
            max_timesteps=max_ts,
            timesteps_remaining=ts_remaining,
            time_progress=time_progress,
            constraints=workflow.constraints,
            task_ids=list(workflow.tasks.keys()),
            resource_ids=list(workflow.resources.keys()),
            agent_ids=list(workflow.agents.keys()),
            stakeholder_profile=stakeholder_profile,
        )

    # Note: take_action(observation) has been removed from the abstract interface in favor of step(...).

    @abstractmethod
    async def step(
        self,
        workflow: "Workflow",
        execution_state: "ExecutionState",
        stakeholder_profile: StakeholderPublicProfile,
        current_timestep: int,
        running_tasks: dict,
        completed_task_ids: set,
        failed_task_ids: set,
        communication_service: "CommunicationService | None" = None,
        previous_reward: float = 0.0,
        done: bool = False,
    ) -> BaseManagerAction:
        """
        One-call RL-friendly step: build observation and return an action.
        """
        raise NotImplementedError

    def on_action_executed(
        self,
        timestep: int,
        action: BaseManagerAction,
        action_result: ActionResult | None,
    ) -> None:
        """
        Hook invoked by the engine after a manager action has been executed.

        Default implementation records a compact action brief, including a short
        outcome summary when available. Manager implementations can override
        this to customize how actions are logged or persisted.
        """
        reasoning = action.reasoning
        if action_result and action_result.summary:
            reasoning += f" | Outcome of action: {action_result.summary}"

        self.record_action(
            ActionResult(
                kind=action_result.kind if action_result else "unknown",
                timestep=timestep,
                action_type=action.action_type,  # type: ignore[attr-defined]
                summary=action_result.summary
                if action_result
                else "Could not find summary of result, action may have been attempted, but failed to run.",
                data={},
                success=action_result.success if action_result else False,
            )
        )

    @abstractmethod
    def reset(self) -> None:
        """
        Reset the manager agent state for a new workflow execution.
        """
        self._action_buffer.clear()

configure_seed(seed: int) -> None

Configure deterministic seed for this manager (overridable).

Source code in manager_agent_gym/core/manager_agent/interface.py
60
61
62
def configure_seed(self, seed: int) -> None:
    """Configure deterministic seed for this manager (overridable)."""
    self._seed = seed

create_observation(workflow: Workflow, execution_state: ExecutionState, stakeholder_profile: StakeholderPublicProfile, current_timestep: int, running_tasks: dict, completed_task_ids: set, failed_task_ids: set, communication_service: CommunicationService | None = None) -> ManagerObservation async

Create manager observation from workflow state.

Subclasses can override this to customize what they observe.

Parameters:

Name Type Description Default
workflow Workflow

Current workflow state

required
execution_state ExecutionState

Current execution state

required
current_timestep int

Current timestep number

required
running_tasks dict

Currently executing tasks

required
completed_task_ids set

Set of completed task IDs

required
failed_task_ids set

Set of failed task IDs

required
communication_service CommunicationService | None

Optional communication service for messages

None

Returns:

Type Description
ManagerObservation

ManagerObservation with workflow state data

Source code in manager_agent_gym/core/manager_agent/interface.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
async def create_observation(
    self,
    workflow: "Workflow",
    execution_state: "ExecutionState",
    stakeholder_profile: StakeholderPublicProfile,
    current_timestep: int,
    running_tasks: dict,
    completed_task_ids: set,
    failed_task_ids: set,
    communication_service: "CommunicationService | None" = None,
) -> ManagerObservation:
    """
    Create manager observation from workflow state.

    Subclasses can override this to customize what they observe.

    Args:
        workflow: Current workflow state
        execution_state: Current execution state
        current_timestep: Current timestep number
        running_tasks: Currently executing tasks
        completed_task_ids: Set of completed task IDs
        failed_task_ids: Set of failed task IDs
        communication_service: Optional communication service for messages

    Returns:
        ManagerObservation with workflow state data
    """
    # Get task status summary
    task_statuses = {}
    for status in TaskStatus:
        task_statuses[status.value] = sum(
            1 for task in workflow.tasks.values() if task.status == status
        )

    # Get ready tasks
    ready_tasks = workflow.get_ready_tasks()

    # Get available agents
    available_agents = workflow.get_available_agents()

    # Get recent messages from communication service if available
    recent_messages = []
    if communication_service:
        all_comm_messages = communication_service.get_all_messages()
        recent_messages = all_comm_messages[:10]  # Last 10 messages
    else:
        # Fallback to workflow messages for backward compatibility
        recent_messages = workflow.messages[-5:]  # Last 5 messages

    # Compute timeline awareness fields if configured
    max_ts = self._max_timesteps
    ts_remaining = None
    time_progress = None
    if isinstance(max_ts, int) and max_ts > 0:
        ts_remaining = max(0, max_ts - current_timestep - 1)
        # Clamp progress in [0,1]
        time_progress = min(1.0, max(0.0, float(current_timestep) / float(max_ts)))

    return ManagerObservation(
        workflow_summary=workflow.pretty_print(),
        timestep=current_timestep,
        workflow_id=workflow.id,
        execution_state=execution_state,
        task_status_counts=task_statuses,
        ready_task_ids=[task.id for task in ready_tasks],
        running_task_ids=list(running_tasks.keys()),
        completed_task_ids=list(completed_task_ids),
        failed_task_ids=list(failed_task_ids),
        available_agent_metadata=[agent.config for agent in available_agents],
        recent_messages=recent_messages,
        workflow_progress=len(completed_task_ids) / len(workflow.tasks)
        if workflow.tasks
        else 0.0,
        max_timesteps=max_ts,
        timesteps_remaining=ts_remaining,
        time_progress=time_progress,
        constraints=workflow.constraints,
        task_ids=list(workflow.tasks.keys()),
        resource_ids=list(workflow.resources.keys()),
        agent_ids=list(workflow.agents.keys()),
        stakeholder_profile=stakeholder_profile,
    )

on_action_executed(timestep: int, action: BaseManagerAction, action_result: ActionResult | None) -> None

Hook invoked by the engine after a manager action has been executed.

Default implementation records a compact action brief, including a short outcome summary when available. Manager implementations can override this to customize how actions are logged or persisted.

Source code in manager_agent_gym/core/manager_agent/interface.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def on_action_executed(
    self,
    timestep: int,
    action: BaseManagerAction,
    action_result: ActionResult | None,
) -> None:
    """
    Hook invoked by the engine after a manager action has been executed.

    Default implementation records a compact action brief, including a short
    outcome summary when available. Manager implementations can override
    this to customize how actions are logged or persisted.
    """
    reasoning = action.reasoning
    if action_result and action_result.summary:
        reasoning += f" | Outcome of action: {action_result.summary}"

    self.record_action(
        ActionResult(
            kind=action_result.kind if action_result else "unknown",
            timestep=timestep,
            action_type=action.action_type,  # type: ignore[attr-defined]
            summary=action_result.summary
            if action_result
            else "Could not find summary of result, action may have been attempted, but failed to run.",
            data={},
            success=action_result.success if action_result else False,
        )
    )

reset() -> None abstractmethod

Reset the manager agent state for a new workflow execution.

Source code in manager_agent_gym/core/manager_agent/interface.py
216
217
218
219
220
221
@abstractmethod
def reset(self) -> None:
    """
    Reset the manager agent state for a new workflow execution.
    """
    self._action_buffer.clear()

set_max_timesteps(max_timesteps: int | None) -> None

Set the maximum timesteps for the current execution (set by engine).

Source code in manager_agent_gym/core/manager_agent/interface.py
75
76
77
78
79
def set_max_timesteps(self, max_timesteps: int | None) -> None:
    """Set the maximum timesteps for the current execution (set by engine)."""
    self._max_timesteps = (
        max_timesteps if (max_timesteps is None or max_timesteps >= 0) else None
    )

step(workflow: Workflow, execution_state: ExecutionState, stakeholder_profile: StakeholderPublicProfile, current_timestep: int, running_tasks: dict, completed_task_ids: set, failed_task_ids: set, communication_service: CommunicationService | None = None, previous_reward: float = 0.0, done: bool = False) -> BaseManagerAction abstractmethod async

One-call RL-friendly step: build observation and return an action.

Source code in manager_agent_gym/core/manager_agent/interface.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@abstractmethod
async def step(
    self,
    workflow: "Workflow",
    execution_state: "ExecutionState",
    stakeholder_profile: StakeholderPublicProfile,
    current_timestep: int,
    running_tasks: dict,
    completed_task_ids: set,
    failed_task_ids: set,
    communication_service: "CommunicationService | None" = None,
    previous_reward: float = 0.0,
    done: bool = False,
) -> BaseManagerAction:
    """
    One-call RL-friendly step: build observation and return an action.
    """
    raise NotImplementedError

Worker Interfaces

AgentInterface specifies the capabilities worker agents must expose. Concrete AI and human simulations live under manager_agent_gym.core.workflow_agents.

Bases: ABC, Generic[ConfigType]

Abstract base class for all agents in the system.

Agents execute tasks and form the core workforce in workflows. They combine execution capabilities with business logic like availability and capacity management.

Source code in manager_agent_gym/core/workflow_agents/interface.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class AgentInterface(ABC, Generic[ConfigType]):
    """
    Abstract base class for all agents in the system.

    Agents execute tasks and form the core workforce in workflows.
    They combine execution capabilities with business logic like
    availability and capacity management.
    """

    def __init__(
        self,
        config: ConfigType,
    ):
        self.config = config
        self.communication_service = COMMUNICATION_SERVICE_SINGLETON
        self._seed: int | None = None

        self.name: str = config.agent_id
        self.description: str
        self.is_available: bool = True
        self.max_concurrent_tasks: int = 1  # Humans limited, AI can override
        self.current_task_ids: list[UUID] = []

        # Performance tracking
        self.tasks_completed: int = 0
        self.joined_at: datetime = datetime.now()
        # Tool usage buffer keyed by task id
        self.tool_usage_by_task: dict[UUID, list[AgentToolUseEvent]] = {}

    def configure_seed(self, seed: int) -> None:
        """Configure deterministic seed for this agent (overridable)."""
        self._seed = seed

    @property
    def agent_id(self) -> str:
        """Get the agent's unique identifier."""
        return self.config.agent_id

    @property
    def agent_type(self) -> str:
        """Get the agent's type."""
        return self.config.agent_type

    def can_handle_task(self, task: Task) -> bool:
        """Check if agent can handle a given task based on availability."""
        if not self.is_available:
            return False
        if len(self.current_task_ids) >= self.max_concurrent_tasks:
            return False
        return True

    def record_tool_use_event(self, event: AgentToolUseEvent) -> None:
        """Record a tool usage event under the current task bucket."""
        task_id = event.task_id
        if task_id is None:
            return
        self.tool_usage_by_task.setdefault(task_id, []).append(event)

    def get_tool_usage_by_task(self) -> dict[UUID, list[AgentToolUseEvent]]:
        """Return a copy of per-task tool usage events for this agent."""
        return {k: list(v) for k, v in self.tool_usage_by_task.items()}

    @abstractmethod
    async def execute_task(
        self, task: Task, resources: list[Resource]
    ) -> ExecutionResult:
        """
        Execute a task given the task and available resources.

        Args:
            task: The task to execute
            resources: Available input resources (optional)

        Returns:
            ExecutionResult with outputs and metadata

        Raises:
            Exception: If execution fails in an unrecoverable way
        """
        pass

agent_id: str property

Get the agent's unique identifier.

agent_type: str property

Get the agent's type.

can_handle_task(task: Task) -> bool

Check if agent can handle a given task based on availability.

Source code in manager_agent_gym/core/workflow_agents/interface.py
75
76
77
78
79
80
81
def can_handle_task(self, task: Task) -> bool:
    """Check if agent can handle a given task based on availability."""
    if not self.is_available:
        return False
    if len(self.current_task_ids) >= self.max_concurrent_tasks:
        return False
    return True

configure_seed(seed: int) -> None

Configure deterministic seed for this agent (overridable).

Source code in manager_agent_gym/core/workflow_agents/interface.py
61
62
63
def configure_seed(self, seed: int) -> None:
    """Configure deterministic seed for this agent (overridable)."""
    self._seed = seed

execute_task(task: Task, resources: list[Resource]) -> ExecutionResult abstractmethod async

Execute a task given the task and available resources.

Parameters:

Name Type Description Default
task Task

The task to execute

required
resources list[Resource]

Available input resources (optional)

required

Returns:

Type Description
ExecutionResult

ExecutionResult with outputs and metadata

Raises:

Type Description
Exception

If execution fails in an unrecoverable way

Source code in manager_agent_gym/core/workflow_agents/interface.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@abstractmethod
async def execute_task(
    self, task: Task, resources: list[Resource]
) -> ExecutionResult:
    """
    Execute a task given the task and available resources.

    Args:
        task: The task to execute
        resources: Available input resources (optional)

    Returns:
        ExecutionResult with outputs and metadata

    Raises:
        Exception: If execution fails in an unrecoverable way
    """
    pass

get_tool_usage_by_task() -> dict[UUID, list[AgentToolUseEvent]]

Return a copy of per-task tool usage events for this agent.

Source code in manager_agent_gym/core/workflow_agents/interface.py
90
91
92
def get_tool_usage_by_task(self) -> dict[UUID, list[AgentToolUseEvent]]:
    """Return a copy of per-task tool usage events for this agent."""
    return {k: list(v) for k, v in self.tool_usage_by_task.items()}

record_tool_use_event(event: AgentToolUseEvent) -> None

Record a tool usage event under the current task bucket.

Source code in manager_agent_gym/core/workflow_agents/interface.py
83
84
85
86
87
88
def record_tool_use_event(self, event: AgentToolUseEvent) -> None:
    """Record a tool usage event under the current task bucket."""
    task_id = event.task_id
    if task_id is None:
        return
    self.tool_usage_by_task.setdefault(task_id, []).append(event)

Agent Registry

AgentRegistry tracks available agents and provides helpers for registering them with an execution engine.

Dynamic registry for agents participating in a workflow run.

Maintains agent instances, allows late binding of agent classes, and optionally schedules agents to join/leave at specific timesteps.

Example
reg = AgentRegistry()
reg.register_agent_class("ai", AIAgent)
reg.register_agent_class("human_mock", MockHumanAgent)
reg.register_ai_agent(AIAgentConfig(agent_id="ai_analyst"), [])
Source code in manager_agent_gym/core/workflow_agents/registry.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
class AgentRegistry:
    """Dynamic registry for agents participating in a workflow run.

    Maintains agent instances, allows late binding of agent classes, and
    optionally schedules agents to join/leave at specific timesteps.

    Example:
        ```python
        reg = AgentRegistry()
        reg.register_agent_class("ai", AIAgent)
        reg.register_agent_class("human_mock", MockHumanAgent)
        reg.register_ai_agent(AIAgentConfig(agent_id="ai_analyst"), [])
        ```
    """

    def __init__(self):
        self._agents: dict[str, AgentInterface] = {}
        self._agent_classes: dict[str, Type[AgentInterface]] = {}
        # Optional: simple built-in scheduler for adding/removing agents at timesteps
        self._scheduled_changes: dict[int, list[ScheduledAgentChange]] = {}
        self._executed_change_timesteps: set[int] = set()

    def register_agent_class(
        self, agent_type: str, agent_class: Type[AgentInterface]
    ) -> None:
        """
        Register an agent class for a specific agent type.

        Args:
            agent_type: The type identifier for the agent
            agent_class: The agent class to register
        """
        self._agent_classes[agent_type] = agent_class

    def create_agent(self, config: AgentConfig) -> AgentInterface:
        """
        Create an agent instance from configuration.

        Args:
            config: Agent configuration

        Returns:
            Agent instance

        Raises:
            ValueError: If agent type is not registered
        """
        if config.agent_type not in self._agent_classes:
            raise ValueError(f"Unknown agent type: {config.agent_type}")

        agent_class = self._agent_classes[config.agent_type]
        agent = agent_class(config)
        self._agents[config.agent_id] = agent
        return agent

    def register_agent(self, agent: AgentInterface) -> None:
        """
        Register an existing agent instance.

        Args:
            agent: The agent instance to register
        """
        self._agents[agent.agent_id] = agent

    def get_agent(self, agent_id: str) -> AgentInterface | None:
        """
        Get an agent by ID.

        Args:
            agent_id: The agent's unique identifier

        Returns:
            Agent instance or None if not found
        """
        return self._agents.get(agent_id)

    def list_agents(self) -> list[AgentInterface]:
        """
        Get all registered agents.

        Returns:
            List of all agent instances
        """
        return list(self._agents.values())

    def remove_agent(self, agent_id: str) -> bool:
        """
        Remove an agent from the registry.

        Args:
            agent_id: The agent's unique identifier

        Returns:
            True if agent was removed, False if not found
        """
        if agent_id in self._agents:
            del self._agents[agent_id]
            return True
        return False

    def clear(self) -> None:
        """Clear all registered agents."""
        self._agents.clear()

    def get_agent_stats(self) -> dict[str, int]:
        """
        Get statistics about registered agents.

        Returns:
            Dictionary with agent type counts
        """
        stats: dict[str, int] = {}
        for agent in self._agents.values():
            agent_type = agent.agent_type
            stats[agent_type] = stats.get(agent_type, 0) + 1
        return stats

    def register_ai_agent(
        self, config: AgentConfig | AIAgentConfig, additional_tools: list[Tool]
    ) -> None:
        """
        Create and register an AI agent.

        Args:
            config: Agent configuration
            additional_tools: List of tools for the agent

        Returns:
            Created AI agent instance
        """
        if not additional_tools:
            additional_tools = ToolFactory.create_ai_tools()

        if not isinstance(config, AIAgentConfig):
            raise ValueError(
                "config must be an AIAgentConfig, received: " + str(type(config))
            )

        agent = AIAgent(config=config, tools=additional_tools)
        self.register_agent(agent)

    def register_human_agent(
        self,
        config: HumanAgentConfig,
        additional_tools: list[Tool],
    ) -> None:
        """
        Create and register a human mock agent.

        Args:
            config: Human agent configuration (includes persona and noise settings)
            additional_tools: List of tools for the agent

        Returns:
            Created human mock agent instance
        """
        if not additional_tools:
            additional_tools = ToolFactory.create_human_tools()

        agent = MockHumanAgent(config, additional_tools)
        self.register_agent(agent)

    # --- Optional simple scheduling API (can replace AgentCoordinationEngine) ---
    def schedule_agent_add(
        self,
        timestep: int,
        config: AIAgentConfig | HumanAgentConfig,
        reason: str = "",
    ) -> None:
        change = ScheduledAgentChange(
            timestep=timestep,
            action="add",
            agent_config=config,
            reason=reason,
        )
        self._scheduled_changes.setdefault(timestep, []).append(change)

    def schedule_agent_remove(
        self, timestep: int, agent_id: str, reason: str = ""
    ) -> None:
        change = ScheduledAgentChange(
            timestep=timestep,
            action="remove",
            agent_id=agent_id,
            reason=reason,
        )
        self._scheduled_changes.setdefault(timestep, []).append(change)

    def apply_scheduled_changes_for_timestep(
        self,
        timestep: int,
        communication_service: "CommunicationService | None" = None,
        tool_factory: "ToolFactoryType | None" = None,
    ) -> list[str]:
        """
        Apply any scheduled add/remove operations for the given timestep.

        Returns a list of human-readable change descriptions.
        """
        if (
            timestep not in self._scheduled_changes
            or timestep in self._executed_change_timesteps
        ):
            return []

        changes: list[str] = []
        for change in self._scheduled_changes.get(timestep, []):
            if change.action == "add" and change.agent_config is not None:
                # Prepare tools and register based on type
                tools: list[Tool] = []
                if tool_factory is not None and communication_service is not None:
                    tools = tool_factory.add_communication_tools(
                        tools, communication_service, change.agent_config.agent_id
                    )

                if isinstance(change.agent_config, AIAgentConfig):
                    self.register_ai_agent(change.agent_config, tools)
                elif isinstance(change.agent_config, HumanAgentConfig):
                    self.register_human_agent(change.agent_config, tools)
                else:
                    changes.append(
                        f"Unsupported agent type for add: {type(change.agent_config)}"
                    )
                    continue

                # Inject comms if provided
                if communication_service is not None:
                    agent = self.get_agent(change.agent_config.agent_id)
                    if agent is not None:
                        agent.communication_service = communication_service

                changes.append(f"Added {change.agent_config.agent_id}: {change.reason}")

            elif change.action == "remove" and change.agent_id is not None:
                removed = self.remove_agent(change.agent_id)
                if removed:
                    changes.append(f"Removed {change.agent_id}: {change.reason}")
                else:
                    changes.append(
                        f"Could not remove {change.agent_id}: {change.reason}"
                    )
            else:
                changes.append("Invalid scheduled change entry")

        self._executed_change_timesteps.add(timestep)
        return changes

apply_scheduled_changes_for_timestep(timestep: int, communication_service: CommunicationService | None = None, tool_factory: ToolFactoryType | None = None) -> list[str]

Apply any scheduled add/remove operations for the given timestep.

Returns a list of human-readable change descriptions.

Source code in manager_agent_gym/core/workflow_agents/registry.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def apply_scheduled_changes_for_timestep(
    self,
    timestep: int,
    communication_service: "CommunicationService | None" = None,
    tool_factory: "ToolFactoryType | None" = None,
) -> list[str]:
    """
    Apply any scheduled add/remove operations for the given timestep.

    Returns a list of human-readable change descriptions.
    """
    if (
        timestep not in self._scheduled_changes
        or timestep in self._executed_change_timesteps
    ):
        return []

    changes: list[str] = []
    for change in self._scheduled_changes.get(timestep, []):
        if change.action == "add" and change.agent_config is not None:
            # Prepare tools and register based on type
            tools: list[Tool] = []
            if tool_factory is not None and communication_service is not None:
                tools = tool_factory.add_communication_tools(
                    tools, communication_service, change.agent_config.agent_id
                )

            if isinstance(change.agent_config, AIAgentConfig):
                self.register_ai_agent(change.agent_config, tools)
            elif isinstance(change.agent_config, HumanAgentConfig):
                self.register_human_agent(change.agent_config, tools)
            else:
                changes.append(
                    f"Unsupported agent type for add: {type(change.agent_config)}"
                )
                continue

            # Inject comms if provided
            if communication_service is not None:
                agent = self.get_agent(change.agent_config.agent_id)
                if agent is not None:
                    agent.communication_service = communication_service

            changes.append(f"Added {change.agent_config.agent_id}: {change.reason}")

        elif change.action == "remove" and change.agent_id is not None:
            removed = self.remove_agent(change.agent_id)
            if removed:
                changes.append(f"Removed {change.agent_id}: {change.reason}")
            else:
                changes.append(
                    f"Could not remove {change.agent_id}: {change.reason}"
                )
        else:
            changes.append("Invalid scheduled change entry")

    self._executed_change_timesteps.add(timestep)
    return changes

clear() -> None

Clear all registered agents.

Source code in manager_agent_gym/core/workflow_agents/registry.py
126
127
128
def clear(self) -> None:
    """Clear all registered agents."""
    self._agents.clear()

create_agent(config: AgentConfig) -> AgentInterface

Create an agent instance from configuration.

Parameters:

Name Type Description Default
config AgentConfig

Agent configuration

required

Returns:

Type Description
AgentInterface

Agent instance

Raises:

Type Description
ValueError

If agent type is not registered

Source code in manager_agent_gym/core/workflow_agents/registry.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def create_agent(self, config: AgentConfig) -> AgentInterface:
    """
    Create an agent instance from configuration.

    Args:
        config: Agent configuration

    Returns:
        Agent instance

    Raises:
        ValueError: If agent type is not registered
    """
    if config.agent_type not in self._agent_classes:
        raise ValueError(f"Unknown agent type: {config.agent_type}")

    agent_class = self._agent_classes[config.agent_type]
    agent = agent_class(config)
    self._agents[config.agent_id] = agent
    return agent

get_agent(agent_id: str) -> AgentInterface | None

Get an agent by ID.

Parameters:

Name Type Description Default
agent_id str

The agent's unique identifier

required

Returns:

Type Description
AgentInterface | None

Agent instance or None if not found

Source code in manager_agent_gym/core/workflow_agents/registry.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_agent(self, agent_id: str) -> AgentInterface | None:
    """
    Get an agent by ID.

    Args:
        agent_id: The agent's unique identifier

    Returns:
        Agent instance or None if not found
    """
    return self._agents.get(agent_id)

get_agent_stats() -> dict[str, int]

Get statistics about registered agents.

Returns:

Type Description
dict[str, int]

Dictionary with agent type counts

Source code in manager_agent_gym/core/workflow_agents/registry.py
130
131
132
133
134
135
136
137
138
139
140
141
def get_agent_stats(self) -> dict[str, int]:
    """
    Get statistics about registered agents.

    Returns:
        Dictionary with agent type counts
    """
    stats: dict[str, int] = {}
    for agent in self._agents.values():
        agent_type = agent.agent_type
        stats[agent_type] = stats.get(agent_type, 0) + 1
    return stats

list_agents() -> list[AgentInterface]

Get all registered agents.

Returns:

Type Description
list[AgentInterface]

List of all agent instances

Source code in manager_agent_gym/core/workflow_agents/registry.py
102
103
104
105
106
107
108
109
def list_agents(self) -> list[AgentInterface]:
    """
    Get all registered agents.

    Returns:
        List of all agent instances
    """
    return list(self._agents.values())

register_agent(agent: AgentInterface) -> None

Register an existing agent instance.

Parameters:

Name Type Description Default
agent AgentInterface

The agent instance to register

required
Source code in manager_agent_gym/core/workflow_agents/registry.py
81
82
83
84
85
86
87
88
def register_agent(self, agent: AgentInterface) -> None:
    """
    Register an existing agent instance.

    Args:
        agent: The agent instance to register
    """
    self._agents[agent.agent_id] = agent

register_agent_class(agent_type: str, agent_class: Type[AgentInterface]) -> None

Register an agent class for a specific agent type.

Parameters:

Name Type Description Default
agent_type str

The type identifier for the agent

required
agent_class Type[AgentInterface]

The agent class to register

required
Source code in manager_agent_gym/core/workflow_agents/registry.py
48
49
50
51
52
53
54
55
56
57
58
def register_agent_class(
    self, agent_type: str, agent_class: Type[AgentInterface]
) -> None:
    """
    Register an agent class for a specific agent type.

    Args:
        agent_type: The type identifier for the agent
        agent_class: The agent class to register
    """
    self._agent_classes[agent_type] = agent_class

register_ai_agent(config: AgentConfig | AIAgentConfig, additional_tools: list[Tool]) -> None

Create and register an AI agent.

Parameters:

Name Type Description Default
config AgentConfig | AIAgentConfig

Agent configuration

required
additional_tools list[Tool]

List of tools for the agent

required

Returns:

Type Description
None

Created AI agent instance

Source code in manager_agent_gym/core/workflow_agents/registry.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def register_ai_agent(
    self, config: AgentConfig | AIAgentConfig, additional_tools: list[Tool]
) -> None:
    """
    Create and register an AI agent.

    Args:
        config: Agent configuration
        additional_tools: List of tools for the agent

    Returns:
        Created AI agent instance
    """
    if not additional_tools:
        additional_tools = ToolFactory.create_ai_tools()

    if not isinstance(config, AIAgentConfig):
        raise ValueError(
            "config must be an AIAgentConfig, received: " + str(type(config))
        )

    agent = AIAgent(config=config, tools=additional_tools)
    self.register_agent(agent)

register_human_agent(config: HumanAgentConfig, additional_tools: list[Tool]) -> None

Create and register a human mock agent.

Parameters:

Name Type Description Default
config HumanAgentConfig

Human agent configuration (includes persona and noise settings)

required
additional_tools list[Tool]

List of tools for the agent

required

Returns:

Type Description
None

Created human mock agent instance

Source code in manager_agent_gym/core/workflow_agents/registry.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def register_human_agent(
    self,
    config: HumanAgentConfig,
    additional_tools: list[Tool],
) -> None:
    """
    Create and register a human mock agent.

    Args:
        config: Human agent configuration (includes persona and noise settings)
        additional_tools: List of tools for the agent

    Returns:
        Created human mock agent instance
    """
    if not additional_tools:
        additional_tools = ToolFactory.create_human_tools()

    agent = MockHumanAgent(config, additional_tools)
    self.register_agent(agent)

remove_agent(agent_id: str) -> bool

Remove an agent from the registry.

Parameters:

Name Type Description Default
agent_id str

The agent's unique identifier

required

Returns:

Type Description
bool

True if agent was removed, False if not found

Source code in manager_agent_gym/core/workflow_agents/registry.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def remove_agent(self, agent_id: str) -> bool:
    """
    Remove an agent from the registry.

    Args:
        agent_id: The agent's unique identifier

    Returns:
        True if agent was removed, False if not found
    """
    if agent_id in self._agents:
        del self._agents[agent_id]
        return True
    return False

Communication Service

CommunicationService stores threaded conversations and broadcasts between agents and stakeholders throughout a run.

Centralized message hub for agent interactions.

Implements the Communication (C) component of the POSG model. Provides direct, multicast, and broadcast messaging, conversation threads, and grouped views for manager oversight.

Example
comm = CommunicationService()
await comm.broadcast_message(from_agent="manager", content="Kickoff")
inbox = comm.get_messages_for_agent("agent_a", limit=20)
Source code in manager_agent_gym/core/communication/service.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
class CommunicationService:
    """Centralized message hub for agent interactions.

    Implements the Communication (C) component of the POSG model. Provides
    direct, multicast, and broadcast messaging, conversation threads, and
    grouped views for manager oversight.

    Example:
        ```python
        comm = CommunicationService()
        await comm.broadcast_message(from_agent="manager", content="Kickoff")
        inbox = comm.get_messages_for_agent("agent_a", limit=20)
        ```
    """

    def __init__(self):
        """Initialize the communication service."""
        self.graph = CommunicationGraph()
        self._message_listeners: dict[
            str, list[Callable[[Message], Awaitable[None]] | Callable[[Message], None]]
        ] = {}
        self._lock = asyncio.Lock()
        # Workflow control flags (per-instance)
        self._end_workflow_requested: bool = False

    def request_end_workflow(self, reason: str | None = None) -> None:
        """Signal that the current workflow should end as soon as possible."""
        self._end_workflow_requested = True
        if reason:
            logger.warning(f"End workflow requested: {reason}")
        else:
            logger.warning("End workflow requested")

    def is_end_workflow_requested(self) -> bool:
        """Return True if an end-of-workflow has been requested."""
        return self._end_workflow_requested

    async def send_direct_message(
        self,
        from_agent: str,
        to_agent: str,
        content: str,
        message_type: MessageType = MessageType.DIRECT,
        related_task_id: UUID | None = None,
        thread_id: UUID | None = None,
        priority: int = 1,
    ) -> Message:
        """
        Send a direct message between two agents.

        Args:
            from_agent: ID of the sending agent
            to_agent: ID of the receiving agent
            content: Message content
            message_type: Type of message
            related_task_id: Optional task this message relates to
            thread_id: Optional conversation thread
            priority: Message priority (1-5)

        Returns:
            The created Message object
        """
        async with self._lock:
            message = Message(
                sender_id=from_agent,
                receiver_id=to_agent,
                content=content,
                message_type=message_type,
                related_task_id=related_task_id,
                thread_id=thread_id,
                priority=priority,
            )

            # Add to graph
            self.graph.add_message(message)

            # Notify listeners
            await self._notify_listeners(message)

            logger.info(
                f"Direct message sent: {from_agent} -> {to_agent} "
                f"[{message_type.value}]: {content[:50]}..."
            )

            return message

    async def broadcast_message(
        self,
        from_agent: str,
        content: str,
        message_type: MessageType = MessageType.BROADCAST,
        related_task_id: UUID | None = None,
        exclude_agents: list[str] | None = None,
        priority: int = 1,
    ) -> Message:
        """
        Broadcast a message to all agents in the system.

        Args:
            from_agent: ID of the sending agent
            content: Message content
            message_type: Type of message
            related_task_id: Optional task this message relates to
            exclude_agents: Optional list of agents to exclude from broadcast
            priority: Message priority (1-5)

        Returns:
            The created Message object
        """
        async with self._lock:
            # Get all known agents except sender and excluded agents
            all_agents = self.graph.agent_registry.copy()
            all_agents.discard(from_agent)

            if exclude_agents:
                for excluded in exclude_agents:
                    all_agents.discard(excluded)

            message = Message(
                sender_id=from_agent,
                receiver_id=None,  # None indicates broadcast
                recipients=list(all_agents),
                content=content,
                message_type=message_type,
                related_task_id=related_task_id,
                priority=priority,
            )

            # Add to graph
            self.graph.add_message(message)

            # Notify listeners
            await self._notify_listeners(message)

            logger.info(
                f"Broadcast message sent: {from_agent} -> ALL "
                f"[{message_type.value}]: {content[:50]}... "
                f"({len(all_agents)} recipients)"
            )

            return message

    async def send_multicast_message(
        self,
        from_agent: str,
        to_agents: list[str],
        content: str,
        message_type: MessageType = MessageType.DIRECT,
        related_task_id: UUID | None = None,
        thread_id: UUID | None = None,
        priority: int = 1,
    ) -> Message:
        """
        Send a message to multiple specific agents.

        Args:
            from_agent: ID of the sending agent
            to_agents: List of recipient agent IDs
            content: Message content
            message_type: Type of message
            related_task_id: Optional task this message relates to
            thread_id: Optional conversation thread
            priority: Message priority (1-5)

        Returns:
            The created Message object
        """
        async with self._lock:
            message = Message(
                sender_id=from_agent,
                receiver_id=to_agents[0] if to_agents else None,
                recipients=to_agents,
                content=content,
                message_type=message_type,
                related_task_id=related_task_id,
                thread_id=thread_id,
                priority=priority,
            )

            # Add to graph
            self.graph.add_message(message)

            # Notify listeners
            await self._notify_listeners(message)

            logger.info(
                f"Multicast message sent: {from_agent} -> {to_agents} "
                f"[{message_type.value}]: {content[:50]}..."
            )

            return message

    def get_messages_for_agent(
        self,
        agent_id: str,
        since: datetime | None = None,
        message_types: list[MessageType] | None = None,
        related_to_task: UUID | None = None,
        limit: int | None = None,
        include_broadcasts: bool = True,
    ) -> list[Message]:
        """
        Get messages sent to a specific agent with filtering.

        Args:
            agent_id: The agent to get messages for
            since: Only return messages after this timestamp
            message_types: Only return messages of these types
            related_to_task: Only return messages related to this task
            limit: Maximum number of messages to return
            include_broadcasts: Whether to include broadcast messages

        Returns:
            List of messages sorted by timestamp (newest first)
        """
        messages = self.graph.get_messages_for_agent(
            agent_id=agent_id, since=since, message_types=message_types, limit=limit
        )

        # Additional filtering
        if related_to_task is not None:
            messages = [m for m in messages if m.related_task_id == related_to_task]

        if not include_broadcasts:
            messages = [m for m in messages if not m.is_broadcast()]

        return messages

    def get_conversation_history(
        self, agent_id: str, other_agent: str, limit: int = 50
    ) -> list[Message]:
        """
        Get conversation history between two agents.

        Args:
            agent_id: First agent ID
            other_agent: Second agent ID
            limit: Maximum number of messages to return

        Returns:
            List of messages in chronological order
        """
        return self.graph.get_conversation_history(
            agent_id=agent_id, other_agent=other_agent, limit=limit
        )

    def get_task_communications(self, task_id: UUID) -> list[Message]:
        """
        Get all communications related to a specific task.

        Args:
            task_id: The task ID to get communications for

        Returns:
            List of task-related messages sorted by timestamp
        """
        task_messages = [
            msg
            for msg in self.graph.messages.values()
            if msg.related_task_id == task_id
        ]

        # Sort by timestamp
        task_messages.sort(key=lambda m: m.timestamp)
        return task_messages

    def get_recent_broadcasts(
        self, since_minutes: int = 60, limit: int = 10
    ) -> list[Message]:
        """
        Get recent broadcast messages.

        Args:
            since_minutes: How many minutes back to look
            limit: Maximum number of broadcasts to return

        Returns:
            List of recent broadcast messages
        """
        since_time = datetime.now() - timedelta(minutes=since_minutes)

        broadcasts = [
            msg
            for msg in self.graph.messages.values()
            if msg.is_broadcast() and msg.timestamp >= since_time
        ]

        # Sort by timestamp (newest first) and apply limit
        broadcasts.sort(key=lambda m: m.timestamp, reverse=True)
        return broadcasts[:limit] if limit else broadcasts

    def get_all_messages(self) -> list[Message]:
        """
        Get all messages in the communication system.

        Useful for manager oversight and debugging/examples.

        Returns:
            List of all messages sorted by timestamp (newest first)
        """
        all_messages = list(self.graph.messages.values())
        return sorted(all_messages, key=lambda m: m.timestamp, reverse=True)

    def get_all_messages_grouped(
        self,
        grouping: MessageGrouping = MessageGrouping.BY_SENDER,
        sort_within_group: str = "time",  # "time" or "thread"
        include_broadcasts: bool = True,
    ) -> list[SenderMessagesView] | list[ThreadMessagesView]:
        """
        Return a strongly-typed view of all messages grouped by sender or thread.

        Args:
            grouping: How to group messages (by sender or by thread).
            sort_within_group: Sort messages inside each group by "time" (default) or by "thread" id.
            include_broadcasts: Whether to include broadcast messages in groups.

        Returns:
            A list of `SenderMessagesView` when grouping by sender, or a list of
            `ThreadMessagesView` when grouping by thread.
        """
        messages = list(self.graph.messages.values())
        if not include_broadcasts:
            messages = [m for m in messages if not m.is_broadcast()]

        if grouping == MessageGrouping.BY_SENDER:
            by_sender: dict[str, list[Message]] = {}
            for msg in messages:
                by_sender.setdefault(msg.sender_id, []).append(msg)

            views: list[SenderMessagesView] = []
            for sender_id, msgs in by_sender.items():
                # Sort within group
                if sort_within_group == "thread":
                    msgs.sort(
                        key=lambda m: (
                            str(m.thread_id) if m.thread_id else "",
                            m.timestamp,
                        )
                    )
                else:
                    msgs.sort(key=lambda m: m.timestamp)

                most_recent = max(msgs, key=lambda m: m.timestamp).timestamp
                views.append(
                    SenderMessagesView(
                        sender_id=sender_id,
                        total_messages=len(msgs),
                        most_recent_at=most_recent,
                        messages=msgs,
                    )
                )

            # Sort groups by most recent activity desc
            views.sort(key=lambda v: v.most_recent_at, reverse=True)
            return views

        # Grouping by thread
        by_thread: dict[str, list[Message]] = {}
        for msg in messages:
            key = str(msg.thread_id) if msg.thread_id else "NO_THREAD"
            by_thread.setdefault(key, []).append(msg)

        views_t: list[ThreadMessagesView] = []
        for key, msgs in by_thread.items():
            # Sort within group
            if sort_within_group == "thread":
                msgs.sort(
                    key=lambda m: (str(m.thread_id) if m.thread_id else "", m.timestamp)
                )
            else:
                msgs.sort(key=lambda m: m.timestamp)

            last_activity = max(msgs, key=lambda m: m.timestamp).timestamp
            thread_uuid = msgs[0].thread_id if msgs and msgs[0].thread_id else None
            thread_meta = self.graph.threads.get(thread_uuid) if thread_uuid else None
            participants: list[str] = sorted(
                list(
                    {m.sender_id for m in msgs}
                    | set.union(*[m.get_all_recipients() for m in msgs])
                    if msgs
                    else set()
                )
            )
            views_t.append(
                ThreadMessagesView(
                    thread_id=thread_uuid,
                    topic=thread_meta.topic if thread_meta else None,
                    participants=participants,
                    related_task_id=thread_meta.related_task_id
                    if thread_meta
                    else None,
                    total_messages=len(msgs),
                    last_activity=last_activity,
                    messages=msgs,
                )
            )

        # Sort groups by last activity desc; ensure NO_THREAD group comes last
        views_t.sort(key=lambda v: (v.last_activity, v.thread_id is None), reverse=True)
        return views_t

    def get_messages_grouped_by_sender(
        self,
        sort_within_group: str = "time",
        include_broadcasts: bool = True,
    ) -> list[SenderMessagesView]:
        """Typed helper: return messages grouped by sender."""
        views = self.get_all_messages_grouped(
            grouping=MessageGrouping.BY_SENDER,
            sort_within_group=sort_within_group,
            include_broadcasts=include_broadcasts,
        )
        return cast(list[SenderMessagesView], views)

    def get_manager_view(self) -> dict[str, Any]:
        """
        Get complete communication overview for manager agent.

        Provides full visibility into all communications for oversight.

        Returns:
            Dictionary containing comprehensive communication data
        """
        total_messages = len(self.graph.messages)
        active_threads = len([t for t in self.graph.threads.values() if t.is_active])

        # Recent activity summary
        recent_time = datetime.now() - timedelta(hours=1)
        recent_messages = [
            msg for msg in self.graph.messages.values() if msg.timestamp >= recent_time
        ]

        # Communication patterns
        agent_stats = {}
        for agent_id in self.graph.agent_registry:
            agent_stats[agent_id] = self.graph.get_agent_communication_stats(agent_id)

        return {
            "total_messages": total_messages,
            "active_threads": active_threads,
            "recent_messages_count": len(recent_messages),
            "known_agents": list(self.graph.agent_registry),
            "agent_statistics": agent_stats,
            "communication_edges": len(self.graph.edges),
            "recent_activity": [
                {
                    "message_id": str(msg.message_id),
                    "sender": msg.sender_id,
                    "recipient": msg.receiver_id,
                    "type": msg.message_type.value,
                    "timestamp": msg.timestamp.isoformat(),
                    "content_preview": msg.content[:100] + "..."
                    if len(msg.content) > 100
                    else msg.content,
                }
                for msg in sorted(
                    recent_messages, key=lambda m: m.timestamp, reverse=True
                )[:20]
            ],
        }

    def get_agent_view(self, agent_id: str) -> dict[str, Any]:
        """
        Get filtered communication view for a specific agent.

        Args:
            agent_id: The agent to get the view for

        Returns:
            Dictionary containing agent-specific communication data
        """
        # Get recent messages for this agent
        recent_messages = self.get_messages_for_agent(
            agent_id=agent_id, since=datetime.now() - timedelta(hours=2), limit=20
        )

        # Get agent statistics
        stats = self.graph.get_agent_communication_stats(agent_id)

        # Get active conversations
        active_conversations = {}
        for msg in recent_messages:
            if not msg.is_broadcast():
                other_agent = (
                    msg.sender_id if msg.sender_id != agent_id else msg.receiver_id
                )
                if other_agent and other_agent not in active_conversations:
                    active_conversations[other_agent] = self.get_conversation_history(
                        agent_id, other_agent, limit=5
                    )

        return {
            "agent_id": agent_id,
            "recent_messages": [
                {
                    "from": msg.sender_id,
                    "content": msg.content,
                    "type": msg.message_type.value,
                    "timestamp": msg.timestamp.isoformat(),
                    "task_id": str(msg.related_task_id)
                    if msg.related_task_id
                    else None,
                }
                for msg in recent_messages
            ],
            "statistics": stats,
            "active_conversations": {
                other_agent: [
                    {
                        "from": msg.sender_id,
                        "content": msg.content,
                        "timestamp": msg.timestamp.isoformat(),
                    }
                    for msg in messages
                ]
                for other_agent, messages in active_conversations.items()
            },
        }

    def mark_message_read(self, message_id: UUID, agent_id: str) -> bool:
        """
        Mark a message as read by an agent.

        Args:
            message_id: The message to mark as read
            agent_id: The agent who read the message

        Returns:
            True if successful, False if message not found
        """
        if message_id in self.graph.messages:
            self.graph.messages[message_id].mark_read_by(agent_id)
            return True
        return False

    def create_thread(
        self,
        participants: list[str],
        topic: str | None = None,
        related_task_id: UUID | None = None,
    ) -> CommunicationThread:
        """
        Create a new conversation thread.

        Args:
            participants: List of agent IDs to include in the thread
            topic: Optional topic for the thread
            related_task_id: Optional task this thread relates to

        Returns:
            The created CommunicationThread
        """
        thread = CommunicationThread(
            participants=set(participants), topic=topic, related_task_id=related_task_id
        )

        self.graph.threads[thread.thread_id] = thread

        logger.info(
            f"Created conversation thread {thread.thread_id} "
            f"with participants: {participants}"
        )

        return thread

    async def add_message_to_thread(
        self,
        thread_id: UUID,
        from_agent: str,
        to_agent: str | None,
        content: str,
        message_type: MessageType = MessageType.DIRECT,
        priority: int = 1,
    ) -> Message:
        """Add a message into an existing thread (direct to one or multicast to participants)."""
        async with self._lock:
            # If recipient omitted, default to broadcast within thread participants (excluding sender)
            recipients: list[str] | None = None
            receiver_id: str | None = None
            if to_agent is None and thread_id in self.graph.threads:
                participants = list(self.graph.threads[thread_id].participants)
                if from_agent in participants:
                    participants.remove(from_agent)
                recipients = participants
            else:
                receiver_id = to_agent

            related_task = (
                self.graph.threads[thread_id].related_task_id
                if thread_id in self.graph.threads
                else None
            )

            if not recipients:
                raise ValueError(
                    "No recipients specified for sending a message to a thread."
                )

            message = Message(
                sender_id=from_agent,
                receiver_id=receiver_id,
                recipients=recipients,
                content=content,
                message_type=message_type,
                related_task_id=related_task,
                thread_id=thread_id,
                priority=priority,
            )

            self.graph.add_message(message)
            await self._notify_listeners(message)
            logger.info(
                f"Thread message sent: {from_agent} -> {to_agent or 'thread'} [{message_type.value}]"
            )
            return message

    async def add_message_listener(
        self,
        agent_id: str,
        callback: Callable[[Message], Awaitable[None]] | Callable[[Message], None],
    ) -> None:
        """
        Add a listener for new messages to an agent.

        Args:
            agent_id: The agent to listen for messages to
            callback: Function to call when new messages arrive
        """
        if agent_id not in self._message_listeners:
            self._message_listeners[agent_id] = []

        self._message_listeners[agent_id].append(callback)
        logger.info(f"Added message listener for agent {agent_id}")

    async def _notify_listeners(self, message: Message) -> None:
        """Notify all registered listeners about a new message."""
        # Notify listeners for specific recipients
        for recipient in message.get_all_recipients():
            if recipient in self._message_listeners:
                for callback in self._message_listeners[recipient]:
                    try:
                        if asyncio.iscoroutinefunction(callback):
                            await callback(message)
                        else:
                            callback(message)
                    except Exception as e:
                        logger.error(f"Error notifying message listener: {e}")

        # Notify broadcast listeners if it's a broadcast
        if message.is_broadcast() and "BROADCAST" in self._message_listeners:
            for callback in self._message_listeners["BROADCAST"]:
                try:
                    if asyncio.iscoroutinefunction(callback):
                        await callback(message)
                    else:
                        callback(message)
                except Exception as e:
                    logger.error(f"Error notifying broadcast listener: {e}")

    def get_communication_analytics(self) -> dict[str, Any]:
        """
        Get analytics and insights about communication patterns.

        Returns:
            Dictionary containing communication analytics
        """
        total_messages = len(self.graph.messages)
        if total_messages == 0:
            return {"total_messages": 0, "analytics": "No messages to analyze"}

        # Message type distribution
        type_counts: dict[str, int] = {}
        for msg in self.graph.messages.values():
            msg_type = msg.message_type.value
            type_counts[msg_type] = type_counts.get(msg_type, 0) + 1

        # Most active agents
        agent_activity: dict[str, int] = {}
        for msg in self.graph.messages.values():
            sender = msg.sender_id
            agent_activity[sender] = agent_activity.get(sender, 0) + 1

        most_active = sorted(agent_activity.items(), key=lambda x: x[1], reverse=True)[
            :5
        ]

        # Recent activity trend (messages per hour for last 24 hours)
        now = datetime.now()
        hourly_counts = []
        for i in range(24):
            hour_start = now - timedelta(hours=i + 1)
            hour_end = now - timedelta(hours=i)
            hour_count = sum(
                1
                for msg in self.graph.messages.values()
                if hour_start <= msg.timestamp < hour_end
            )
            hourly_counts.append(hour_count)

        return {
            "total_messages": total_messages,
            "message_type_distribution": type_counts,
            "most_active_agents": most_active,
            "hourly_activity_last_24h": list(reversed(hourly_counts)),
            "total_agents": len(self.graph.agent_registry),
            "active_threads": len(
                [t for t in self.graph.threads.values() if t.is_active]
            ),
            "communication_edges": len(self.graph.edges),
        }

_notify_listeners(message: Message) -> None async

Notify all registered listeners about a new message.

Source code in manager_agent_gym/core/communication/service.py
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
async def _notify_listeners(self, message: Message) -> None:
    """Notify all registered listeners about a new message."""
    # Notify listeners for specific recipients
    for recipient in message.get_all_recipients():
        if recipient in self._message_listeners:
            for callback in self._message_listeners[recipient]:
                try:
                    if asyncio.iscoroutinefunction(callback):
                        await callback(message)
                    else:
                        callback(message)
                except Exception as e:
                    logger.error(f"Error notifying message listener: {e}")

    # Notify broadcast listeners if it's a broadcast
    if message.is_broadcast() and "BROADCAST" in self._message_listeners:
        for callback in self._message_listeners["BROADCAST"]:
            try:
                if asyncio.iscoroutinefunction(callback):
                    await callback(message)
                else:
                    callback(message)
            except Exception as e:
                logger.error(f"Error notifying broadcast listener: {e}")

add_message_listener(agent_id: str, callback: Callable[[Message], Awaitable[None]] | Callable[[Message], None]) -> None async

Add a listener for new messages to an agent.

Parameters:

Name Type Description Default
agent_id str

The agent to listen for messages to

required
callback Callable[[Message], Awaitable[None]] | Callable[[Message], None]

Function to call when new messages arrive

required
Source code in manager_agent_gym/core/communication/service.py
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
async def add_message_listener(
    self,
    agent_id: str,
    callback: Callable[[Message], Awaitable[None]] | Callable[[Message], None],
) -> None:
    """
    Add a listener for new messages to an agent.

    Args:
        agent_id: The agent to listen for messages to
        callback: Function to call when new messages arrive
    """
    if agent_id not in self._message_listeners:
        self._message_listeners[agent_id] = []

    self._message_listeners[agent_id].append(callback)
    logger.info(f"Added message listener for agent {agent_id}")

add_message_to_thread(thread_id: UUID, from_agent: str, to_agent: str | None, content: str, message_type: MessageType = MessageType.DIRECT, priority: int = 1) -> Message async

Add a message into an existing thread (direct to one or multicast to participants).

Source code in manager_agent_gym/core/communication/service.py
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
async def add_message_to_thread(
    self,
    thread_id: UUID,
    from_agent: str,
    to_agent: str | None,
    content: str,
    message_type: MessageType = MessageType.DIRECT,
    priority: int = 1,
) -> Message:
    """Add a message into an existing thread (direct to one or multicast to participants)."""
    async with self._lock:
        # If recipient omitted, default to broadcast within thread participants (excluding sender)
        recipients: list[str] | None = None
        receiver_id: str | None = None
        if to_agent is None and thread_id in self.graph.threads:
            participants = list(self.graph.threads[thread_id].participants)
            if from_agent in participants:
                participants.remove(from_agent)
            recipients = participants
        else:
            receiver_id = to_agent

        related_task = (
            self.graph.threads[thread_id].related_task_id
            if thread_id in self.graph.threads
            else None
        )

        if not recipients:
            raise ValueError(
                "No recipients specified for sending a message to a thread."
            )

        message = Message(
            sender_id=from_agent,
            receiver_id=receiver_id,
            recipients=recipients,
            content=content,
            message_type=message_type,
            related_task_id=related_task,
            thread_id=thread_id,
            priority=priority,
        )

        self.graph.add_message(message)
        await self._notify_listeners(message)
        logger.info(
            f"Thread message sent: {from_agent} -> {to_agent or 'thread'} [{message_type.value}]"
        )
        return message

broadcast_message(from_agent: str, content: str, message_type: MessageType = MessageType.BROADCAST, related_task_id: UUID | None = None, exclude_agents: list[str] | None = None, priority: int = 1) -> Message async

Broadcast a message to all agents in the system.

Parameters:

Name Type Description Default
from_agent str

ID of the sending agent

required
content str

Message content

required
message_type MessageType

Type of message

BROADCAST
related_task_id UUID | None

Optional task this message relates to

None
exclude_agents list[str] | None

Optional list of agents to exclude from broadcast

None
priority int

Message priority (1-5)

1

Returns:

Type Description
Message

The created Message object

Source code in manager_agent_gym/core/communication/service.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
async def broadcast_message(
    self,
    from_agent: str,
    content: str,
    message_type: MessageType = MessageType.BROADCAST,
    related_task_id: UUID | None = None,
    exclude_agents: list[str] | None = None,
    priority: int = 1,
) -> Message:
    """
    Broadcast a message to all agents in the system.

    Args:
        from_agent: ID of the sending agent
        content: Message content
        message_type: Type of message
        related_task_id: Optional task this message relates to
        exclude_agents: Optional list of agents to exclude from broadcast
        priority: Message priority (1-5)

    Returns:
        The created Message object
    """
    async with self._lock:
        # Get all known agents except sender and excluded agents
        all_agents = self.graph.agent_registry.copy()
        all_agents.discard(from_agent)

        if exclude_agents:
            for excluded in exclude_agents:
                all_agents.discard(excluded)

        message = Message(
            sender_id=from_agent,
            receiver_id=None,  # None indicates broadcast
            recipients=list(all_agents),
            content=content,
            message_type=message_type,
            related_task_id=related_task_id,
            priority=priority,
        )

        # Add to graph
        self.graph.add_message(message)

        # Notify listeners
        await self._notify_listeners(message)

        logger.info(
            f"Broadcast message sent: {from_agent} -> ALL "
            f"[{message_type.value}]: {content[:50]}... "
            f"({len(all_agents)} recipients)"
        )

        return message

create_thread(participants: list[str], topic: str | None = None, related_task_id: UUID | None = None) -> CommunicationThread

Create a new conversation thread.

Parameters:

Name Type Description Default
participants list[str]

List of agent IDs to include in the thread

required
topic str | None

Optional topic for the thread

None
related_task_id UUID | None

Optional task this thread relates to

None

Returns:

Type Description
CommunicationThread

The created CommunicationThread

Source code in manager_agent_gym/core/communication/service.py
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
def create_thread(
    self,
    participants: list[str],
    topic: str | None = None,
    related_task_id: UUID | None = None,
) -> CommunicationThread:
    """
    Create a new conversation thread.

    Args:
        participants: List of agent IDs to include in the thread
        topic: Optional topic for the thread
        related_task_id: Optional task this thread relates to

    Returns:
        The created CommunicationThread
    """
    thread = CommunicationThread(
        participants=set(participants), topic=topic, related_task_id=related_task_id
    )

    self.graph.threads[thread.thread_id] = thread

    logger.info(
        f"Created conversation thread {thread.thread_id} "
        f"with participants: {participants}"
    )

    return thread

get_agent_view(agent_id: str) -> dict[str, Any]

Get filtered communication view for a specific agent.

Parameters:

Name Type Description Default
agent_id str

The agent to get the view for

required

Returns:

Type Description
dict[str, Any]

Dictionary containing agent-specific communication data

Source code in manager_agent_gym/core/communication/service.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
def get_agent_view(self, agent_id: str) -> dict[str, Any]:
    """
    Get filtered communication view for a specific agent.

    Args:
        agent_id: The agent to get the view for

    Returns:
        Dictionary containing agent-specific communication data
    """
    # Get recent messages for this agent
    recent_messages = self.get_messages_for_agent(
        agent_id=agent_id, since=datetime.now() - timedelta(hours=2), limit=20
    )

    # Get agent statistics
    stats = self.graph.get_agent_communication_stats(agent_id)

    # Get active conversations
    active_conversations = {}
    for msg in recent_messages:
        if not msg.is_broadcast():
            other_agent = (
                msg.sender_id if msg.sender_id != agent_id else msg.receiver_id
            )
            if other_agent and other_agent not in active_conversations:
                active_conversations[other_agent] = self.get_conversation_history(
                    agent_id, other_agent, limit=5
                )

    return {
        "agent_id": agent_id,
        "recent_messages": [
            {
                "from": msg.sender_id,
                "content": msg.content,
                "type": msg.message_type.value,
                "timestamp": msg.timestamp.isoformat(),
                "task_id": str(msg.related_task_id)
                if msg.related_task_id
                else None,
            }
            for msg in recent_messages
        ],
        "statistics": stats,
        "active_conversations": {
            other_agent: [
                {
                    "from": msg.sender_id,
                    "content": msg.content,
                    "timestamp": msg.timestamp.isoformat(),
                }
                for msg in messages
            ]
            for other_agent, messages in active_conversations.items()
        },
    }

get_all_messages() -> list[Message]

Get all messages in the communication system.

Useful for manager oversight and debugging/examples.

Returns:

Type Description
list[Message]

List of all messages sorted by timestamp (newest first)

Source code in manager_agent_gym/core/communication/service.py
318
319
320
321
322
323
324
325
326
327
328
def get_all_messages(self) -> list[Message]:
    """
    Get all messages in the communication system.

    Useful for manager oversight and debugging/examples.

    Returns:
        List of all messages sorted by timestamp (newest first)
    """
    all_messages = list(self.graph.messages.values())
    return sorted(all_messages, key=lambda m: m.timestamp, reverse=True)

get_all_messages_grouped(grouping: MessageGrouping = MessageGrouping.BY_SENDER, sort_within_group: str = 'time', include_broadcasts: bool = True) -> list[SenderMessagesView] | list[ThreadMessagesView]

Return a strongly-typed view of all messages grouped by sender or thread.

Parameters:

Name Type Description Default
grouping MessageGrouping

How to group messages (by sender or by thread).

BY_SENDER
sort_within_group str

Sort messages inside each group by "time" (default) or by "thread" id.

'time'
include_broadcasts bool

Whether to include broadcast messages in groups.

True

Returns:

Type Description
list[SenderMessagesView] | list[ThreadMessagesView]

A list of SenderMessagesView when grouping by sender, or a list of

list[SenderMessagesView] | list[ThreadMessagesView]

ThreadMessagesView when grouping by thread.

Source code in manager_agent_gym/core/communication/service.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def get_all_messages_grouped(
    self,
    grouping: MessageGrouping = MessageGrouping.BY_SENDER,
    sort_within_group: str = "time",  # "time" or "thread"
    include_broadcasts: bool = True,
) -> list[SenderMessagesView] | list[ThreadMessagesView]:
    """
    Return a strongly-typed view of all messages grouped by sender or thread.

    Args:
        grouping: How to group messages (by sender or by thread).
        sort_within_group: Sort messages inside each group by "time" (default) or by "thread" id.
        include_broadcasts: Whether to include broadcast messages in groups.

    Returns:
        A list of `SenderMessagesView` when grouping by sender, or a list of
        `ThreadMessagesView` when grouping by thread.
    """
    messages = list(self.graph.messages.values())
    if not include_broadcasts:
        messages = [m for m in messages if not m.is_broadcast()]

    if grouping == MessageGrouping.BY_SENDER:
        by_sender: dict[str, list[Message]] = {}
        for msg in messages:
            by_sender.setdefault(msg.sender_id, []).append(msg)

        views: list[SenderMessagesView] = []
        for sender_id, msgs in by_sender.items():
            # Sort within group
            if sort_within_group == "thread":
                msgs.sort(
                    key=lambda m: (
                        str(m.thread_id) if m.thread_id else "",
                        m.timestamp,
                    )
                )
            else:
                msgs.sort(key=lambda m: m.timestamp)

            most_recent = max(msgs, key=lambda m: m.timestamp).timestamp
            views.append(
                SenderMessagesView(
                    sender_id=sender_id,
                    total_messages=len(msgs),
                    most_recent_at=most_recent,
                    messages=msgs,
                )
            )

        # Sort groups by most recent activity desc
        views.sort(key=lambda v: v.most_recent_at, reverse=True)
        return views

    # Grouping by thread
    by_thread: dict[str, list[Message]] = {}
    for msg in messages:
        key = str(msg.thread_id) if msg.thread_id else "NO_THREAD"
        by_thread.setdefault(key, []).append(msg)

    views_t: list[ThreadMessagesView] = []
    for key, msgs in by_thread.items():
        # Sort within group
        if sort_within_group == "thread":
            msgs.sort(
                key=lambda m: (str(m.thread_id) if m.thread_id else "", m.timestamp)
            )
        else:
            msgs.sort(key=lambda m: m.timestamp)

        last_activity = max(msgs, key=lambda m: m.timestamp).timestamp
        thread_uuid = msgs[0].thread_id if msgs and msgs[0].thread_id else None
        thread_meta = self.graph.threads.get(thread_uuid) if thread_uuid else None
        participants: list[str] = sorted(
            list(
                {m.sender_id for m in msgs}
                | set.union(*[m.get_all_recipients() for m in msgs])
                if msgs
                else set()
            )
        )
        views_t.append(
            ThreadMessagesView(
                thread_id=thread_uuid,
                topic=thread_meta.topic if thread_meta else None,
                participants=participants,
                related_task_id=thread_meta.related_task_id
                if thread_meta
                else None,
                total_messages=len(msgs),
                last_activity=last_activity,
                messages=msgs,
            )
        )

    # Sort groups by last activity desc; ensure NO_THREAD group comes last
    views_t.sort(key=lambda v: (v.last_activity, v.thread_id is None), reverse=True)
    return views_t

get_communication_analytics() -> dict[str, Any]

Get analytics and insights about communication patterns.

Returns:

Type Description
dict[str, Any]

Dictionary containing communication analytics

Source code in manager_agent_gym/core/communication/service.py
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
def get_communication_analytics(self) -> dict[str, Any]:
    """
    Get analytics and insights about communication patterns.

    Returns:
        Dictionary containing communication analytics
    """
    total_messages = len(self.graph.messages)
    if total_messages == 0:
        return {"total_messages": 0, "analytics": "No messages to analyze"}

    # Message type distribution
    type_counts: dict[str, int] = {}
    for msg in self.graph.messages.values():
        msg_type = msg.message_type.value
        type_counts[msg_type] = type_counts.get(msg_type, 0) + 1

    # Most active agents
    agent_activity: dict[str, int] = {}
    for msg in self.graph.messages.values():
        sender = msg.sender_id
        agent_activity[sender] = agent_activity.get(sender, 0) + 1

    most_active = sorted(agent_activity.items(), key=lambda x: x[1], reverse=True)[
        :5
    ]

    # Recent activity trend (messages per hour for last 24 hours)
    now = datetime.now()
    hourly_counts = []
    for i in range(24):
        hour_start = now - timedelta(hours=i + 1)
        hour_end = now - timedelta(hours=i)
        hour_count = sum(
            1
            for msg in self.graph.messages.values()
            if hour_start <= msg.timestamp < hour_end
        )
        hourly_counts.append(hour_count)

    return {
        "total_messages": total_messages,
        "message_type_distribution": type_counts,
        "most_active_agents": most_active,
        "hourly_activity_last_24h": list(reversed(hourly_counts)),
        "total_agents": len(self.graph.agent_registry),
        "active_threads": len(
            [t for t in self.graph.threads.values() if t.is_active]
        ),
        "communication_edges": len(self.graph.edges),
    }

get_conversation_history(agent_id: str, other_agent: str, limit: int = 50) -> list[Message]

Get conversation history between two agents.

Parameters:

Name Type Description Default
agent_id str

First agent ID

required
other_agent str

Second agent ID

required
limit int

Maximum number of messages to return

50

Returns:

Type Description
list[Message]

List of messages in chronological order

Source code in manager_agent_gym/core/communication/service.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def get_conversation_history(
    self, agent_id: str, other_agent: str, limit: int = 50
) -> list[Message]:
    """
    Get conversation history between two agents.

    Args:
        agent_id: First agent ID
        other_agent: Second agent ID
        limit: Maximum number of messages to return

    Returns:
        List of messages in chronological order
    """
    return self.graph.get_conversation_history(
        agent_id=agent_id, other_agent=other_agent, limit=limit
    )

get_manager_view() -> dict[str, Any]

Get complete communication overview for manager agent.

Provides full visibility into all communications for oversight.

Returns:

Type Description
dict[str, Any]

Dictionary containing comprehensive communication data

Source code in manager_agent_gym/core/communication/service.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def get_manager_view(self) -> dict[str, Any]:
    """
    Get complete communication overview for manager agent.

    Provides full visibility into all communications for oversight.

    Returns:
        Dictionary containing comprehensive communication data
    """
    total_messages = len(self.graph.messages)
    active_threads = len([t for t in self.graph.threads.values() if t.is_active])

    # Recent activity summary
    recent_time = datetime.now() - timedelta(hours=1)
    recent_messages = [
        msg for msg in self.graph.messages.values() if msg.timestamp >= recent_time
    ]

    # Communication patterns
    agent_stats = {}
    for agent_id in self.graph.agent_registry:
        agent_stats[agent_id] = self.graph.get_agent_communication_stats(agent_id)

    return {
        "total_messages": total_messages,
        "active_threads": active_threads,
        "recent_messages_count": len(recent_messages),
        "known_agents": list(self.graph.agent_registry),
        "agent_statistics": agent_stats,
        "communication_edges": len(self.graph.edges),
        "recent_activity": [
            {
                "message_id": str(msg.message_id),
                "sender": msg.sender_id,
                "recipient": msg.receiver_id,
                "type": msg.message_type.value,
                "timestamp": msg.timestamp.isoformat(),
                "content_preview": msg.content[:100] + "..."
                if len(msg.content) > 100
                else msg.content,
            }
            for msg in sorted(
                recent_messages, key=lambda m: m.timestamp, reverse=True
            )[:20]
        ],
    }

get_messages_for_agent(agent_id: str, since: datetime | None = None, message_types: list[MessageType] | None = None, related_to_task: UUID | None = None, limit: int | None = None, include_broadcasts: bool = True) -> list[Message]

Get messages sent to a specific agent with filtering.

Parameters:

Name Type Description Default
agent_id str

The agent to get messages for

required
since datetime | None

Only return messages after this timestamp

None
message_types list[MessageType] | None

Only return messages of these types

None
related_to_task UUID | None

Only return messages related to this task

None
limit int | None

Maximum number of messages to return

None
include_broadcasts bool

Whether to include broadcast messages

True

Returns:

Type Description
list[Message]

List of messages sorted by timestamp (newest first)

Source code in manager_agent_gym/core/communication/service.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def get_messages_for_agent(
    self,
    agent_id: str,
    since: datetime | None = None,
    message_types: list[MessageType] | None = None,
    related_to_task: UUID | None = None,
    limit: int | None = None,
    include_broadcasts: bool = True,
) -> list[Message]:
    """
    Get messages sent to a specific agent with filtering.

    Args:
        agent_id: The agent to get messages for
        since: Only return messages after this timestamp
        message_types: Only return messages of these types
        related_to_task: Only return messages related to this task
        limit: Maximum number of messages to return
        include_broadcasts: Whether to include broadcast messages

    Returns:
        List of messages sorted by timestamp (newest first)
    """
    messages = self.graph.get_messages_for_agent(
        agent_id=agent_id, since=since, message_types=message_types, limit=limit
    )

    # Additional filtering
    if related_to_task is not None:
        messages = [m for m in messages if m.related_task_id == related_to_task]

    if not include_broadcasts:
        messages = [m for m in messages if not m.is_broadcast()]

    return messages

get_messages_grouped_by_sender(sort_within_group: str = 'time', include_broadcasts: bool = True) -> list[SenderMessagesView]

Typed helper: return messages grouped by sender.

Source code in manager_agent_gym/core/communication/service.py
429
430
431
432
433
434
435
436
437
438
439
440
def get_messages_grouped_by_sender(
    self,
    sort_within_group: str = "time",
    include_broadcasts: bool = True,
) -> list[SenderMessagesView]:
    """Typed helper: return messages grouped by sender."""
    views = self.get_all_messages_grouped(
        grouping=MessageGrouping.BY_SENDER,
        sort_within_group=sort_within_group,
        include_broadcasts=include_broadcasts,
    )
    return cast(list[SenderMessagesView], views)

get_recent_broadcasts(since_minutes: int = 60, limit: int = 10) -> list[Message]

Get recent broadcast messages.

Parameters:

Name Type Description Default
since_minutes int

How many minutes back to look

60
limit int

Maximum number of broadcasts to return

10

Returns:

Type Description
list[Message]

List of recent broadcast messages

Source code in manager_agent_gym/core/communication/service.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def get_recent_broadcasts(
    self, since_minutes: int = 60, limit: int = 10
) -> list[Message]:
    """
    Get recent broadcast messages.

    Args:
        since_minutes: How many minutes back to look
        limit: Maximum number of broadcasts to return

    Returns:
        List of recent broadcast messages
    """
    since_time = datetime.now() - timedelta(minutes=since_minutes)

    broadcasts = [
        msg
        for msg in self.graph.messages.values()
        if msg.is_broadcast() and msg.timestamp >= since_time
    ]

    # Sort by timestamp (newest first) and apply limit
    broadcasts.sort(key=lambda m: m.timestamp, reverse=True)
    return broadcasts[:limit] if limit else broadcasts

get_task_communications(task_id: UUID) -> list[Message]

Get all communications related to a specific task.

Parameters:

Name Type Description Default
task_id UUID

The task ID to get communications for

required

Returns:

Type Description
list[Message]

List of task-related messages sorted by timestamp

Source code in manager_agent_gym/core/communication/service.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def get_task_communications(self, task_id: UUID) -> list[Message]:
    """
    Get all communications related to a specific task.

    Args:
        task_id: The task ID to get communications for

    Returns:
        List of task-related messages sorted by timestamp
    """
    task_messages = [
        msg
        for msg in self.graph.messages.values()
        if msg.related_task_id == task_id
    ]

    # Sort by timestamp
    task_messages.sort(key=lambda m: m.timestamp)
    return task_messages

is_end_workflow_requested() -> bool

Return True if an end-of-workflow has been requested.

Source code in manager_agent_gym/core/communication/service.py
60
61
62
def is_end_workflow_requested(self) -> bool:
    """Return True if an end-of-workflow has been requested."""
    return self._end_workflow_requested

mark_message_read(message_id: UUID, agent_id: str) -> bool

Mark a message as read by an agent.

Parameters:

Name Type Description Default
message_id UUID

The message to mark as read

required
agent_id str

The agent who read the message

required

Returns:

Type Description
bool

True if successful, False if message not found

Source code in manager_agent_gym/core/communication/service.py
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def mark_message_read(self, message_id: UUID, agent_id: str) -> bool:
    """
    Mark a message as read by an agent.

    Args:
        message_id: The message to mark as read
        agent_id: The agent who read the message

    Returns:
        True if successful, False if message not found
    """
    if message_id in self.graph.messages:
        self.graph.messages[message_id].mark_read_by(agent_id)
        return True
    return False

request_end_workflow(reason: str | None = None) -> None

Signal that the current workflow should end as soon as possible.

Source code in manager_agent_gym/core/communication/service.py
52
53
54
55
56
57
58
def request_end_workflow(self, reason: str | None = None) -> None:
    """Signal that the current workflow should end as soon as possible."""
    self._end_workflow_requested = True
    if reason:
        logger.warning(f"End workflow requested: {reason}")
    else:
        logger.warning("End workflow requested")

send_direct_message(from_agent: str, to_agent: str, content: str, message_type: MessageType = MessageType.DIRECT, related_task_id: UUID | None = None, thread_id: UUID | None = None, priority: int = 1) -> Message async

Send a direct message between two agents.

Parameters:

Name Type Description Default
from_agent str

ID of the sending agent

required
to_agent str

ID of the receiving agent

required
content str

Message content

required
message_type MessageType

Type of message

DIRECT
related_task_id UUID | None

Optional task this message relates to

None
thread_id UUID | None

Optional conversation thread

None
priority int

Message priority (1-5)

1

Returns:

Type Description
Message

The created Message object

Source code in manager_agent_gym/core/communication/service.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
async def send_direct_message(
    self,
    from_agent: str,
    to_agent: str,
    content: str,
    message_type: MessageType = MessageType.DIRECT,
    related_task_id: UUID | None = None,
    thread_id: UUID | None = None,
    priority: int = 1,
) -> Message:
    """
    Send a direct message between two agents.

    Args:
        from_agent: ID of the sending agent
        to_agent: ID of the receiving agent
        content: Message content
        message_type: Type of message
        related_task_id: Optional task this message relates to
        thread_id: Optional conversation thread
        priority: Message priority (1-5)

    Returns:
        The created Message object
    """
    async with self._lock:
        message = Message(
            sender_id=from_agent,
            receiver_id=to_agent,
            content=content,
            message_type=message_type,
            related_task_id=related_task_id,
            thread_id=thread_id,
            priority=priority,
        )

        # Add to graph
        self.graph.add_message(message)

        # Notify listeners
        await self._notify_listeners(message)

        logger.info(
            f"Direct message sent: {from_agent} -> {to_agent} "
            f"[{message_type.value}]: {content[:50]}..."
        )

        return message

send_multicast_message(from_agent: str, to_agents: list[str], content: str, message_type: MessageType = MessageType.DIRECT, related_task_id: UUID | None = None, thread_id: UUID | None = None, priority: int = 1) -> Message async

Send a message to multiple specific agents.

Parameters:

Name Type Description Default
from_agent str

ID of the sending agent

required
to_agents list[str]

List of recipient agent IDs

required
content str

Message content

required
message_type MessageType

Type of message

DIRECT
related_task_id UUID | None

Optional task this message relates to

None
thread_id UUID | None

Optional conversation thread

None
priority int

Message priority (1-5)

1

Returns:

Type Description
Message

The created Message object

Source code in manager_agent_gym/core/communication/service.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
async def send_multicast_message(
    self,
    from_agent: str,
    to_agents: list[str],
    content: str,
    message_type: MessageType = MessageType.DIRECT,
    related_task_id: UUID | None = None,
    thread_id: UUID | None = None,
    priority: int = 1,
) -> Message:
    """
    Send a message to multiple specific agents.

    Args:
        from_agent: ID of the sending agent
        to_agents: List of recipient agent IDs
        content: Message content
        message_type: Type of message
        related_task_id: Optional task this message relates to
        thread_id: Optional conversation thread
        priority: Message priority (1-5)

    Returns:
        The created Message object
    """
    async with self._lock:
        message = Message(
            sender_id=from_agent,
            receiver_id=to_agents[0] if to_agents else None,
            recipients=to_agents,
            content=content,
            message_type=message_type,
            related_task_id=related_task_id,
            thread_id=thread_id,
            priority=priority,
        )

        # Add to graph
        self.graph.add_message(message)

        # Notify listeners
        await self._notify_listeners(message)

        logger.info(
            f"Multicast message sent: {from_agent} -> {to_agents} "
            f"[{message_type.value}]: {content[:50]}..."
        )

        return message