Skip to content

Execution I/O

Execution I/O

Utilities for serialising workflow runs and restoring state snapshots.

Workflow Serialiser

Persists timestep data, metrics, and artifacts for later analysis.

Source code in manager_agent_gym/core/execution/output_writer.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
class WorkflowSerialiser:
    def __init__(
        self,
        output_config: OutputConfig,
        communication_service: CommunicationService,
        workflow: Workflow,
    ) -> None:
        self.output_config = output_config
        self.communication_service = communication_service
        self.workflow = workflow

    def ensure_directories(self) -> None:
        try:
            self.output_config.ensure_directories_exist()
        except Exception:
            logger.error("failed to create output directories", exc_info=True)

    def save_timestep(
        self,
        timestep_result: ExecutionResult,
        workflow: Workflow,
        current_timestep: int,
        manager_agent: ManagerAgent | None,
        stakeholder_weights: PreferenceWeights | None,
    ) -> None:
        """Write timestep result and a full workflow snapshot for this timestep."""
        # 1) Write timestep result
        try:
            filepath = self.output_config.get_timestep_file_path(current_timestep)
            filepath.parent.mkdir(parents=True, exist_ok=True)
            with open(filepath, "w") as f:
                data = timestep_result.model_dump(mode="json")
                # Add headline cumulative hours for convenience in per-timestep files
                try:
                    data["cumulative_workflow_hours"] = float(
                        workflow.total_simulated_hours
                    )
                except Exception:
                    data["cumulative_workflow_hours"] = "FAILED_TO_CALCULATE"
                json.dump(data, f, indent=2, default=str)
        except Exception:
            logger.error("failed writing timestep result", exc_info=True)

        # 2) Build agent list with dynamic state
        try:
            agents_serialized: list[dict[str, Any]] = []
            for agent in workflow.agents.values():
                try:
                    agent_id = agent.agent_id
                except Exception:
                    agent_id = "unknown"
                try:
                    agent_type = agent.agent_type
                except Exception:
                    agent_type = "unknown"
                try:
                    agent_config_json = agent.config.model_dump(mode="json")  # type: ignore[attr-defined]
                except Exception:
                    agent_config_json = None

                dynamic_fields = {
                    "is_available": agent.is_available,
                    "current_task_ids": [str(t) for t in agent.current_task_ids],
                    "tasks_completed": agent.tasks_completed,
                    "max_concurrent_tasks": agent.max_concurrent_tasks,
                }

                agents_serialized.append(
                    {
                        "agent_id": agent_id,
                        "agent_type": agent_type,
                        "config": agent_config_json,
                        "state": dynamic_fields,
                    }
                )

            # Manager snapshot (safe subset)
            manager_state: dict[str, Any] | None = None
            try:
                if manager_agent is not None:
                    recent_briefs = manager_agent.get_action_buffer(10)
                    try:
                        recent_serialized = [
                            b.model_dump(mode="json") for b in recent_briefs
                        ]
                    except Exception:
                        recent_serialized = []
                    manager_state = {
                        "agent_id": manager_agent.agent_id,
                        "recent_action_briefs": recent_serialized,
                        "current_preference_weights": (
                            stakeholder_weights.get_preference_dict()
                            if stakeholder_weights is not None
                            else None
                        ),
                    }
            except Exception:
                manager_state = None

            # Stakeholder snapshot (safe subset)
            try:
                stakeholder_state = (
                    {
                        "timestep": current_timestep,
                        "weights": stakeholder_weights.get_preference_dict(),
                        "preference_names": stakeholder_weights.get_preference_names(),
                    }
                    if stakeholder_weights is not None
                    else None
                )
            except Exception:
                stakeholder_state = None

            workflow_snapshot = {
                **workflow.model_dump(
                    mode="json", exclude={"agents", "success_criteria"}
                ),
                "agents": agents_serialized,
                "success_criteria": [],
                "timestep": current_timestep,
                "manager_state": manager_state,
                "stakeholder_state": stakeholder_state,
                # Explicit headline figure for convenience
                "cumulative_workflow_hours": (
                    float(workflow.total_simulated_hours)
                    if isinstance(workflow.total_simulated_hours, (int, float))
                    else "FAILED_TO_CALCULATE"
                ),
            }

            wf_dir = self.output_config.workflow_dir
            if wf_dir is not None:
                try:
                    wf_dir.mkdir(parents=True, exist_ok=True)
                except Exception:
                    pass
                snapshot_name = f"workflow_execution_{self.output_config.run_id}_t{current_timestep:04d}.json"
                with open(wf_dir / snapshot_name, "w") as wf_f:
                    json.dump(workflow_snapshot, wf_f, indent=2, default=str)
        except Exception:
            logger.error("failed writing per-timestep workflow snapshot", exc_info=True)

    def save_workflow_summary(
        self,
        workflow: Workflow,
        completed_task_ids: set,
        failed_task_ids: set,
        current_timestep: int,
    ) -> None:
        """Write final workflow snapshot into workflow_outputs directory."""
        try:
            agents_serialized = []
            for agent in workflow.agents.values():
                try:
                    agent_id = agent.agent_id
                except Exception:
                    agent_id = "unknown"
                try:
                    agent_type = agent.agent_type
                except Exception:
                    agent_type = "unknown"
                try:
                    agent_config_json = agent.config.model_dump(mode="json")  # type: ignore[attr-defined]
                except Exception:
                    agent_config_json = None

                agents_serialized.append(
                    {
                        "agent_id": agent_id,
                        "agent_type": agent_type,
                        "config": agent_config_json,
                    }
                )

            snapshot = {
                **workflow.model_dump(mode="json", exclude={"agents"}),
                "agents": agents_serialized,
                "is_complete": workflow.is_complete(),
                "total_tasks": len(workflow.tasks),
                "completed_tasks": len(completed_task_ids),
                "failed_tasks": len(failed_task_ids),
                "timesteps": current_timestep,
                # Headline cumulative hours across all tasks (simulated)
                "cumulative_workflow_hours": (
                    float(workflow.total_simulated_hours)
                    if isinstance(workflow.total_simulated_hours, (int, float))
                    else "FAILED_TO_CALCULATE"
                ),
            }
            path = self.output_config.get_workflow_summary_path()
            with open(path, "w") as f:
                json.dump(snapshot, f, indent=2, default=str)
        except Exception:
            logger.error("save_workflow_summary failed", exc_info=True)

    def save_execution_logs(
        self, manager_action_history: Sequence[tuple[int, ActionResult | None]]
    ) -> None:
        """Write manager actions into execution_logs directory."""
        try:
            run_id = self.output_config.run_id or "run"
            exec_dir = self.output_config.execution_logs_dir
            if exec_dir is None:
                return
            try:
                exec_dir.mkdir(parents=True, exist_ok=True)
            except Exception:
                pass
            path = exec_dir / f"execution_log_{run_id}.json"

            actions: list[dict[str, Any]] = []
            for ts, act in manager_action_history:
                try:
                    if act is None:
                        actions.append(
                            {"timestep": ts, "action_type": None, "action": None}
                        )
                    else:
                        actions.append(
                            {
                                "timestep": ts,
                                "action_type": act.action_type,
                                "action": act.model_dump(mode="json"),
                            }
                        )
                except Exception:
                    actions.append(
                        {"timestep": ts, "action_type": None, "action": None}
                    )

            payload = {"run_id": run_id, "manager_actions": actions}
            with open(path, "w") as f:
                json.dump(payload, f, indent=2, default=str)
        except Exception:
            logger.error("save_execution_logs failed", exc_info=True)

    def save_evaluation_outputs(
        self, evaluation_results: list[Any], reward_vector: list[float] | None = None
    ) -> None:
        """Save evaluation history, reward vector, and final evaluation into evaluation_outputs directory."""
        ev_dir = self.output_config.evaluation_dir
        if ev_dir is None:
            return
        try:
            ev_dir.mkdir(parents=True, exist_ok=True)
        except Exception:
            pass

        # Save full history
        try:
            history_path = self.output_config.get_evaluation_results_path(
                timestamp=self.output_config.run_id
            )
            with open(history_path, "w") as f:
                history_payload = {
                    "evaluation_history": [
                        er.model_dump(mode="json") for er in evaluation_results
                    ],
                    "reward_vector": list(reward_vector or []),
                }
                json.dump(history_payload, f, indent=2, default=str)
        except Exception:
            logger.error("failed saving evaluation history", exc_info=True)

        # Save final only
        try:
            if evaluation_results:
                final_eval = evaluation_results[-1]
                final_path = (
                    ev_dir / f"final_evaluation_{self.output_config.run_id}.json"
                )
                with open(final_path, "w") as f:
                    json.dump(
                        final_eval.model_dump(mode="json"), f, indent=2, default=str
                    )
        except Exception:
            logger.error("failed saving final evaluation", exc_info=True)

    def _serialize_agents_for_snapshot(self) -> list[dict]:
        serialized: list[dict] = []
        for agent in self.workflow.agents.values():
            try:
                agent_id = agent.agent_id
            except Exception:
                agent_id = "unknown"
            try:
                agent_type = agent.agent_type
            except Exception:
                agent_type = "unknown"
            try:
                agent_config_json = agent.config.model_dump(mode="json")
            except Exception:
                agent_config_json = None

            serialized.append(
                {
                    "agent_id": agent_id,
                    "agent_type": agent_type,
                    "config": agent_config_json,
                }
            )
        return serialized

save_evaluation_outputs(evaluation_results: list[Any], reward_vector: list[float] | None = None) -> None

Save evaluation history, reward vector, and final evaluation into evaluation_outputs directory.

Source code in manager_agent_gym/core/execution/output_writer.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def save_evaluation_outputs(
    self, evaluation_results: list[Any], reward_vector: list[float] | None = None
) -> None:
    """Save evaluation history, reward vector, and final evaluation into evaluation_outputs directory."""
    ev_dir = self.output_config.evaluation_dir
    if ev_dir is None:
        return
    try:
        ev_dir.mkdir(parents=True, exist_ok=True)
    except Exception:
        pass

    # Save full history
    try:
        history_path = self.output_config.get_evaluation_results_path(
            timestamp=self.output_config.run_id
        )
        with open(history_path, "w") as f:
            history_payload = {
                "evaluation_history": [
                    er.model_dump(mode="json") for er in evaluation_results
                ],
                "reward_vector": list(reward_vector or []),
            }
            json.dump(history_payload, f, indent=2, default=str)
    except Exception:
        logger.error("failed saving evaluation history", exc_info=True)

    # Save final only
    try:
        if evaluation_results:
            final_eval = evaluation_results[-1]
            final_path = (
                ev_dir / f"final_evaluation_{self.output_config.run_id}.json"
            )
            with open(final_path, "w") as f:
                json.dump(
                    final_eval.model_dump(mode="json"), f, indent=2, default=str
                )
    except Exception:
        logger.error("failed saving final evaluation", exc_info=True)

save_execution_logs(manager_action_history: Sequence[tuple[int, ActionResult | None]]) -> None

Write manager actions into execution_logs directory.

Source code in manager_agent_gym/core/execution/output_writer.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def save_execution_logs(
    self, manager_action_history: Sequence[tuple[int, ActionResult | None]]
) -> None:
    """Write manager actions into execution_logs directory."""
    try:
        run_id = self.output_config.run_id or "run"
        exec_dir = self.output_config.execution_logs_dir
        if exec_dir is None:
            return
        try:
            exec_dir.mkdir(parents=True, exist_ok=True)
        except Exception:
            pass
        path = exec_dir / f"execution_log_{run_id}.json"

        actions: list[dict[str, Any]] = []
        for ts, act in manager_action_history:
            try:
                if act is None:
                    actions.append(
                        {"timestep": ts, "action_type": None, "action": None}
                    )
                else:
                    actions.append(
                        {
                            "timestep": ts,
                            "action_type": act.action_type,
                            "action": act.model_dump(mode="json"),
                        }
                    )
            except Exception:
                actions.append(
                    {"timestep": ts, "action_type": None, "action": None}
                )

        payload = {"run_id": run_id, "manager_actions": actions}
        with open(path, "w") as f:
            json.dump(payload, f, indent=2, default=str)
    except Exception:
        logger.error("save_execution_logs failed", exc_info=True)

save_timestep(timestep_result: ExecutionResult, workflow: Workflow, current_timestep: int, manager_agent: ManagerAgent | None, stakeholder_weights: PreferenceWeights | None) -> None

Write timestep result and a full workflow snapshot for this timestep.

Source code in manager_agent_gym/core/execution/output_writer.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def save_timestep(
    self,
    timestep_result: ExecutionResult,
    workflow: Workflow,
    current_timestep: int,
    manager_agent: ManagerAgent | None,
    stakeholder_weights: PreferenceWeights | None,
) -> None:
    """Write timestep result and a full workflow snapshot for this timestep."""
    # 1) Write timestep result
    try:
        filepath = self.output_config.get_timestep_file_path(current_timestep)
        filepath.parent.mkdir(parents=True, exist_ok=True)
        with open(filepath, "w") as f:
            data = timestep_result.model_dump(mode="json")
            # Add headline cumulative hours for convenience in per-timestep files
            try:
                data["cumulative_workflow_hours"] = float(
                    workflow.total_simulated_hours
                )
            except Exception:
                data["cumulative_workflow_hours"] = "FAILED_TO_CALCULATE"
            json.dump(data, f, indent=2, default=str)
    except Exception:
        logger.error("failed writing timestep result", exc_info=True)

    # 2) Build agent list with dynamic state
    try:
        agents_serialized: list[dict[str, Any]] = []
        for agent in workflow.agents.values():
            try:
                agent_id = agent.agent_id
            except Exception:
                agent_id = "unknown"
            try:
                agent_type = agent.agent_type
            except Exception:
                agent_type = "unknown"
            try:
                agent_config_json = agent.config.model_dump(mode="json")  # type: ignore[attr-defined]
            except Exception:
                agent_config_json = None

            dynamic_fields = {
                "is_available": agent.is_available,
                "current_task_ids": [str(t) for t in agent.current_task_ids],
                "tasks_completed": agent.tasks_completed,
                "max_concurrent_tasks": agent.max_concurrent_tasks,
            }

            agents_serialized.append(
                {
                    "agent_id": agent_id,
                    "agent_type": agent_type,
                    "config": agent_config_json,
                    "state": dynamic_fields,
                }
            )

        # Manager snapshot (safe subset)
        manager_state: dict[str, Any] | None = None
        try:
            if manager_agent is not None:
                recent_briefs = manager_agent.get_action_buffer(10)
                try:
                    recent_serialized = [
                        b.model_dump(mode="json") for b in recent_briefs
                    ]
                except Exception:
                    recent_serialized = []
                manager_state = {
                    "agent_id": manager_agent.agent_id,
                    "recent_action_briefs": recent_serialized,
                    "current_preference_weights": (
                        stakeholder_weights.get_preference_dict()
                        if stakeholder_weights is not None
                        else None
                    ),
                }
        except Exception:
            manager_state = None

        # Stakeholder snapshot (safe subset)
        try:
            stakeholder_state = (
                {
                    "timestep": current_timestep,
                    "weights": stakeholder_weights.get_preference_dict(),
                    "preference_names": stakeholder_weights.get_preference_names(),
                }
                if stakeholder_weights is not None
                else None
            )
        except Exception:
            stakeholder_state = None

        workflow_snapshot = {
            **workflow.model_dump(
                mode="json", exclude={"agents", "success_criteria"}
            ),
            "agents": agents_serialized,
            "success_criteria": [],
            "timestep": current_timestep,
            "manager_state": manager_state,
            "stakeholder_state": stakeholder_state,
            # Explicit headline figure for convenience
            "cumulative_workflow_hours": (
                float(workflow.total_simulated_hours)
                if isinstance(workflow.total_simulated_hours, (int, float))
                else "FAILED_TO_CALCULATE"
            ),
        }

        wf_dir = self.output_config.workflow_dir
        if wf_dir is not None:
            try:
                wf_dir.mkdir(parents=True, exist_ok=True)
            except Exception:
                pass
            snapshot_name = f"workflow_execution_{self.output_config.run_id}_t{current_timestep:04d}.json"
            with open(wf_dir / snapshot_name, "w") as wf_f:
                json.dump(workflow_snapshot, wf_f, indent=2, default=str)
    except Exception:
        logger.error("failed writing per-timestep workflow snapshot", exc_info=True)

save_workflow_summary(workflow: Workflow, completed_task_ids: set, failed_task_ids: set, current_timestep: int) -> None

Write final workflow snapshot into workflow_outputs directory.

Source code in manager_agent_gym/core/execution/output_writer.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def save_workflow_summary(
    self,
    workflow: Workflow,
    completed_task_ids: set,
    failed_task_ids: set,
    current_timestep: int,
) -> None:
    """Write final workflow snapshot into workflow_outputs directory."""
    try:
        agents_serialized = []
        for agent in workflow.agents.values():
            try:
                agent_id = agent.agent_id
            except Exception:
                agent_id = "unknown"
            try:
                agent_type = agent.agent_type
            except Exception:
                agent_type = "unknown"
            try:
                agent_config_json = agent.config.model_dump(mode="json")  # type: ignore[attr-defined]
            except Exception:
                agent_config_json = None

            agents_serialized.append(
                {
                    "agent_id": agent_id,
                    "agent_type": agent_type,
                    "config": agent_config_json,
                }
            )

        snapshot = {
            **workflow.model_dump(mode="json", exclude={"agents"}),
            "agents": agents_serialized,
            "is_complete": workflow.is_complete(),
            "total_tasks": len(workflow.tasks),
            "completed_tasks": len(completed_task_ids),
            "failed_tasks": len(failed_task_ids),
            "timesteps": current_timestep,
            # Headline cumulative hours across all tasks (simulated)
            "cumulative_workflow_hours": (
                float(workflow.total_simulated_hours)
                if isinstance(workflow.total_simulated_hours, (int, float))
                else "FAILED_TO_CALCULATE"
            ),
        }
        path = self.output_config.get_workflow_summary_path()
        with open(path, "w") as f:
            json.dump(snapshot, f, indent=2, default=str)
    except Exception:
        logger.error("save_workflow_summary failed", exc_info=True)

Workflow State Restorer

Loads saved checkpoints back into an executable workflow state.

Handles restoration of complete workflow state from simulation snapshots.

This class encapsulates all the logic needed to rebuild: - Workflow task states (status, costs, durations, assignments) - Workflow resource states (content, descriptions, artifacts) - Stakeholder preferences - Communication message history - Manager agent action buffer - Active agent registry - Agent workload assignments

Source code in manager_agent_gym/core/execution/state_restorer.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
class WorkflowStateRestorer:
    """
    Handles restoration of complete workflow state from simulation snapshots.

    This class encapsulates all the logic needed to rebuild:
    - Workflow task states (status, costs, durations, assignments)
    - Workflow resource states (content, descriptions, artifacts)
    - Stakeholder preferences
    - Communication message history
    - Manager agent action buffer
    - Active agent registry
    - Agent workload assignments
    """

    def __init__(self, snapshot_dir: str, timestep: int):
        """
        Initialize the state restorer for a specific snapshot.

        Args:
            snapshot_dir: Path to simulation run directory
            timestep: Target timestep to restore from
        """
        self.snapshot_dir = Path(snapshot_dir)
        self.timestep = timestep
        self.timestep_data: dict[str, Any] = {}
        self.execution_log_data: dict[str, Any] = {}

    def load_snapshot_data(self) -> None:
        """Load all necessary snapshot data files."""
        # Load timestep snapshot
        timestep_file = (
            self.snapshot_dir / "timestep_data" / f"timestep_{self.timestep:04d}.json"
        )
        if not timestep_file.exists():
            raise FileNotFoundError(f"Timestep file not found: {timestep_file}")

        with open(timestep_file, "r") as f:
            self.timestep_data = json.load(f)

        # Load execution log for manager action buffer
        execution_log_file = (
            self.snapshot_dir
            / "execution_logs"
            / f"execution_log_{self.snapshot_dir.name.split('_')[-1]}.json"
        )
        if execution_log_file.exists():
            with open(execution_log_file, "r") as f:
                self.execution_log_data = json.load(f)
        else:
            logger.warning("Execution log not found: %s", execution_log_file)

    def restore_workflow_state(self, workflow) -> None:
        """Update workflow task and resource states from snapshot."""
        workflow_snapshot = self.timestep_data["metadata"]["workflow_snapshot"]

        # Update task states
        tasks_data = workflow_snapshot.get("tasks", {})
        for task_id_str, task_data in tasks_data.items():
            task_id = UUID(task_id_str)
            if task_id in workflow.tasks:
                task = workflow.tasks[task_id]
                # Update key state information
                task.status = TaskStatus(task_data["status"])
                task.assigned_agent_id = task_data.get("assigned_agent_id")
                task.actual_duration_hours = task_data.get("actual_duration_hours")
                task.actual_cost = task_data.get("actual_cost")
                task.quality_score = task_data.get("quality_score")
                if task_data.get("started_at"):
                    task.started_at = datetime.fromisoformat(task_data["started_at"])
                if task_data.get("completed_at"):
                    task.completed_at = datetime.fromisoformat(
                        task_data["completed_at"]
                    )
                task.execution_notes = task_data.get("execution_notes", [])

        # Synchronize embedded subtasks with the updated registry to fix status inconsistencies
        for task in workflow.tasks.values():
            task.sync_embedded_tasks_with_registry(workflow.tasks)

        # Restore resources from snapshot
        resources_data = workflow_snapshot.get("resources", {})
        logger.info("Restoring %s resources from snapshot", len(resources_data))

        # Import Resource here to avoid circular imports
        from ...schemas.core.resources import Resource

        for resource_id_str, resource_data in resources_data.items():
            resource_id = UUID(resource_id_str)
            resource = Resource(
                id=resource_id,
                name=resource_data["name"],
                description=resource_data["description"],
                content=resource_data.get("content"),
                content_type=resource_data.get("content_type", "text/plain"),
            )
            workflow.resources[resource_id] = resource

        # Update workflow-level state
        workflow.total_cost = workflow_snapshot.get("total_cost", 0.0)
        if workflow_snapshot.get("started_at"):
            workflow.started_at = datetime.fromisoformat(
                workflow_snapshot["started_at"]
            )
        if workflow_snapshot.get("completed_at"):
            workflow.completed_at = datetime.fromisoformat(
                workflow_snapshot["completed_at"]
            )
        workflow.is_active = workflow_snapshot.get("is_active", True)

    def restore_stakeholder_preferences(self, stakeholder_agent) -> None:
        """Update stakeholder agent preferences to match snapshot."""
        prefs_data = self.timestep_data["metadata"]["stakeholder_preference_state"]

        stakeholder_agent.apply_preference_change(
            timestep=self.timestep,
            new_weights=PreferenceWeights(
                preferences=[
                    Preference(name=key, weight=value)
                    for key, value in prefs_data["weights"].items()
                ]
            ),
            change_event=None,
        )

    def restore_communication_history(self, communication_service) -> None:
        """Restore communication message history from snapshot."""
        # Messages are stored in manager_observation.recent_messages
        manager_observation = self.timestep_data["metadata"]["manager_observation"]
        messages = manager_observation.get("recent_messages", [])

        logger.info("Restoring %s messages from snapshot", len(messages))

        for i, msg_data in enumerate(messages):
            try:
                # Convert snapshot data to Message object
                message = Message(
                    message_id=UUID(msg_data["message_id"]),
                    sender_id=msg_data["sender_id"],
                    receiver_id=msg_data.get("receiver_id"),
                    recipients=msg_data.get("recipients", []),
                    content=msg_data["content"],
                    message_type=MessageType(msg_data["message_type"]),
                    timestamp=datetime.fromisoformat(msg_data["timestamp"]),
                    thread_id=UUID(msg_data["thread_id"])
                    if msg_data.get("thread_id")
                    else None,
                    parent_message_id=UUID(msg_data["parent_message_id"])
                    if msg_data.get("parent_message_id")
                    else None,
                    related_task_id=UUID(msg_data["related_task_id"])
                    if msg_data.get("related_task_id")
                    else None,
                    priority=msg_data.get("priority", 1),
                    read_by={
                        agent_id: datetime.fromisoformat(read_time)
                        for agent_id, read_time in msg_data.get("read_by", {}).items()
                    },
                    metadata=msg_data.get("metadata", {}),
                )

                # Add message to communication service
                communication_service.graph.add_message(message)

                logger.debug(
                    "Restored message %s: %s -> %s...",
                    i + 1,
                    msg_data["sender_id"],
                    msg_data["content"][:50],
                )

            except Exception as e:
                logger.error(
                    "Failed to restore message %s: %s", i + 1, e, exc_info=True
                )
                continue

        logger.info(
            "Successfully restored %s messages",
            len(communication_service.graph.messages),
        )

    def restore_manager_action_buffer(self, manager_agent) -> None:
        """Restore manager agent action buffer from execution logs."""
        if not self.execution_log_data:
            logger.warning(
                "No execution log data available for manager action buffer restoration"
            )
            return

        manager_actions = self.execution_log_data.get("manager_actions", [])
        logger.info("Found %s manager actions in execution log", len(manager_actions))

        # Filter actions up to our target timestep
        actions_to_restore = [
            action
            for action in manager_actions
            if action.get("timestep", 0) <= self.timestep
        ]

        logger.info(
            "Restoring %s manager actions up to timestep %s",
            len(actions_to_restore),
            self.timestep,
        )

        restored_count = 0
        for entry in actions_to_restore:
            try:
                payload = entry.get("action")
                if not payload:
                    continue
                # Reconstruct ActionResult directly from serialized payload
                restored = ActionResult.model_validate(payload)
                if restored.timestep is None:
                    restored.timestep = entry.get("timestep")
                manager_agent.record_action(restored)
                restored_count += 1
            except Exception as e:
                logger.debug(
                    "Skipping invalid manager action entry during restoration: %s",
                    e,
                    exc_info=True,
                )
                continue

        logger.info("Restored %s manager actions into manager buffer", restored_count)

    def restore_active_agents(self, agent_registry: AgentRegistry) -> None:
        """Restore active agent states from snapshot."""
        workflow_snapshot = self.timestep_data["metadata"]["workflow_snapshot"]
        agents_data = workflow_snapshot.get("agents", [])

        logger.info("Restoring %s active agents from snapshot", len(agents_data))

        for agent_data in agents_data:
            agent_id = agent_data.get("agent_id")
            agent_type = agent_data.get("agent_type", "ai")

            if (
                agent_id and agent_id != "stakeholder_balanced"
            ):  # Skip stakeholder agent
                logger.debug(
                    "Active agent from snapshot: %s (%s)", agent_id, agent_type
                )
                # TODO: Properly restore agent states to registry
                # This depends on AgentRegistry interface

    def get_agent_workloads(self) -> dict[str, list[str]]:
        """Extract current task assignments per agent from workflow state."""
        workflow_snapshot = self.timestep_data["metadata"]["workflow_snapshot"]
        tasks_data = workflow_snapshot.get("tasks", {})

        workloads: dict[str, list[str]] = {}

        for task_id, task_data in tasks_data.items():
            assigned_agent = task_data.get("assigned_agent_id")
            if assigned_agent:
                if assigned_agent not in workloads:
                    workloads[assigned_agent] = []
                workloads[assigned_agent].append(
                    f"{task_data.get('name', task_id)} ({task_data.get('status')})"
                )

        return workloads

    def get_simulated_time_state(self) -> dict[str, Any]:
        """Extract simulated time information from snapshot."""
        return {
            "simulated_duration_hours": self.timestep_data.get(
                "simulated_duration_hours", 0.0
            ),
            "execution_time_seconds": self.timestep_data.get(
                "execution_time_seconds", 0.0
            ),
            "timestep": self.timestep,
            "completed_at": self.timestep_data.get("completed_at"),
        }

get_agent_workloads() -> dict[str, list[str]]

Extract current task assignments per agent from workflow state.

Source code in manager_agent_gym/core/execution/state_restorer.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def get_agent_workloads(self) -> dict[str, list[str]]:
    """Extract current task assignments per agent from workflow state."""
    workflow_snapshot = self.timestep_data["metadata"]["workflow_snapshot"]
    tasks_data = workflow_snapshot.get("tasks", {})

    workloads: dict[str, list[str]] = {}

    for task_id, task_data in tasks_data.items():
        assigned_agent = task_data.get("assigned_agent_id")
        if assigned_agent:
            if assigned_agent not in workloads:
                workloads[assigned_agent] = []
            workloads[assigned_agent].append(
                f"{task_data.get('name', task_id)} ({task_data.get('status')})"
            )

    return workloads

get_simulated_time_state() -> dict[str, Any]

Extract simulated time information from snapshot.

Source code in manager_agent_gym/core/execution/state_restorer.py
287
288
289
290
291
292
293
294
295
296
297
298
def get_simulated_time_state(self) -> dict[str, Any]:
    """Extract simulated time information from snapshot."""
    return {
        "simulated_duration_hours": self.timestep_data.get(
            "simulated_duration_hours", 0.0
        ),
        "execution_time_seconds": self.timestep_data.get(
            "execution_time_seconds", 0.0
        ),
        "timestep": self.timestep,
        "completed_at": self.timestep_data.get("completed_at"),
    }

load_snapshot_data() -> None

Load all necessary snapshot data files.

Source code in manager_agent_gym/core/execution/state_restorer.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def load_snapshot_data(self) -> None:
    """Load all necessary snapshot data files."""
    # Load timestep snapshot
    timestep_file = (
        self.snapshot_dir / "timestep_data" / f"timestep_{self.timestep:04d}.json"
    )
    if not timestep_file.exists():
        raise FileNotFoundError(f"Timestep file not found: {timestep_file}")

    with open(timestep_file, "r") as f:
        self.timestep_data = json.load(f)

    # Load execution log for manager action buffer
    execution_log_file = (
        self.snapshot_dir
        / "execution_logs"
        / f"execution_log_{self.snapshot_dir.name.split('_')[-1]}.json"
    )
    if execution_log_file.exists():
        with open(execution_log_file, "r") as f:
            self.execution_log_data = json.load(f)
    else:
        logger.warning("Execution log not found: %s", execution_log_file)

restore_active_agents(agent_registry: AgentRegistry) -> None

Restore active agent states from snapshot.

Source code in manager_agent_gym/core/execution/state_restorer.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def restore_active_agents(self, agent_registry: AgentRegistry) -> None:
    """Restore active agent states from snapshot."""
    workflow_snapshot = self.timestep_data["metadata"]["workflow_snapshot"]
    agents_data = workflow_snapshot.get("agents", [])

    logger.info("Restoring %s active agents from snapshot", len(agents_data))

    for agent_data in agents_data:
        agent_id = agent_data.get("agent_id")
        agent_type = agent_data.get("agent_type", "ai")

        if (
            agent_id and agent_id != "stakeholder_balanced"
        ):  # Skip stakeholder agent
            logger.debug(
                "Active agent from snapshot: %s (%s)", agent_id, agent_type
            )

restore_communication_history(communication_service) -> None

Restore communication message history from snapshot.

Source code in manager_agent_gym/core/execution/state_restorer.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def restore_communication_history(self, communication_service) -> None:
    """Restore communication message history from snapshot."""
    # Messages are stored in manager_observation.recent_messages
    manager_observation = self.timestep_data["metadata"]["manager_observation"]
    messages = manager_observation.get("recent_messages", [])

    logger.info("Restoring %s messages from snapshot", len(messages))

    for i, msg_data in enumerate(messages):
        try:
            # Convert snapshot data to Message object
            message = Message(
                message_id=UUID(msg_data["message_id"]),
                sender_id=msg_data["sender_id"],
                receiver_id=msg_data.get("receiver_id"),
                recipients=msg_data.get("recipients", []),
                content=msg_data["content"],
                message_type=MessageType(msg_data["message_type"]),
                timestamp=datetime.fromisoformat(msg_data["timestamp"]),
                thread_id=UUID(msg_data["thread_id"])
                if msg_data.get("thread_id")
                else None,
                parent_message_id=UUID(msg_data["parent_message_id"])
                if msg_data.get("parent_message_id")
                else None,
                related_task_id=UUID(msg_data["related_task_id"])
                if msg_data.get("related_task_id")
                else None,
                priority=msg_data.get("priority", 1),
                read_by={
                    agent_id: datetime.fromisoformat(read_time)
                    for agent_id, read_time in msg_data.get("read_by", {}).items()
                },
                metadata=msg_data.get("metadata", {}),
            )

            # Add message to communication service
            communication_service.graph.add_message(message)

            logger.debug(
                "Restored message %s: %s -> %s...",
                i + 1,
                msg_data["sender_id"],
                msg_data["content"][:50],
            )

        except Exception as e:
            logger.error(
                "Failed to restore message %s: %s", i + 1, e, exc_info=True
            )
            continue

    logger.info(
        "Successfully restored %s messages",
        len(communication_service.graph.messages),
    )

restore_manager_action_buffer(manager_agent) -> None

Restore manager agent action buffer from execution logs.

Source code in manager_agent_gym/core/execution/state_restorer.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def restore_manager_action_buffer(self, manager_agent) -> None:
    """Restore manager agent action buffer from execution logs."""
    if not self.execution_log_data:
        logger.warning(
            "No execution log data available for manager action buffer restoration"
        )
        return

    manager_actions = self.execution_log_data.get("manager_actions", [])
    logger.info("Found %s manager actions in execution log", len(manager_actions))

    # Filter actions up to our target timestep
    actions_to_restore = [
        action
        for action in manager_actions
        if action.get("timestep", 0) <= self.timestep
    ]

    logger.info(
        "Restoring %s manager actions up to timestep %s",
        len(actions_to_restore),
        self.timestep,
    )

    restored_count = 0
    for entry in actions_to_restore:
        try:
            payload = entry.get("action")
            if not payload:
                continue
            # Reconstruct ActionResult directly from serialized payload
            restored = ActionResult.model_validate(payload)
            if restored.timestep is None:
                restored.timestep = entry.get("timestep")
            manager_agent.record_action(restored)
            restored_count += 1
        except Exception as e:
            logger.debug(
                "Skipping invalid manager action entry during restoration: %s",
                e,
                exc_info=True,
            )
            continue

    logger.info("Restored %s manager actions into manager buffer", restored_count)

restore_stakeholder_preferences(stakeholder_agent) -> None

Update stakeholder agent preferences to match snapshot.

Source code in manager_agent_gym/core/execution/state_restorer.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def restore_stakeholder_preferences(self, stakeholder_agent) -> None:
    """Update stakeholder agent preferences to match snapshot."""
    prefs_data = self.timestep_data["metadata"]["stakeholder_preference_state"]

    stakeholder_agent.apply_preference_change(
        timestep=self.timestep,
        new_weights=PreferenceWeights(
            preferences=[
                Preference(name=key, weight=value)
                for key, value in prefs_data["weights"].items()
            ]
        ),
        change_event=None,
    )

restore_workflow_state(workflow) -> None

Update workflow task and resource states from snapshot.

Source code in manager_agent_gym/core/execution/state_restorer.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def restore_workflow_state(self, workflow) -> None:
    """Update workflow task and resource states from snapshot."""
    workflow_snapshot = self.timestep_data["metadata"]["workflow_snapshot"]

    # Update task states
    tasks_data = workflow_snapshot.get("tasks", {})
    for task_id_str, task_data in tasks_data.items():
        task_id = UUID(task_id_str)
        if task_id in workflow.tasks:
            task = workflow.tasks[task_id]
            # Update key state information
            task.status = TaskStatus(task_data["status"])
            task.assigned_agent_id = task_data.get("assigned_agent_id")
            task.actual_duration_hours = task_data.get("actual_duration_hours")
            task.actual_cost = task_data.get("actual_cost")
            task.quality_score = task_data.get("quality_score")
            if task_data.get("started_at"):
                task.started_at = datetime.fromisoformat(task_data["started_at"])
            if task_data.get("completed_at"):
                task.completed_at = datetime.fromisoformat(
                    task_data["completed_at"]
                )
            task.execution_notes = task_data.get("execution_notes", [])

    # Synchronize embedded subtasks with the updated registry to fix status inconsistencies
    for task in workflow.tasks.values():
        task.sync_embedded_tasks_with_registry(workflow.tasks)

    # Restore resources from snapshot
    resources_data = workflow_snapshot.get("resources", {})
    logger.info("Restoring %s resources from snapshot", len(resources_data))

    # Import Resource here to avoid circular imports
    from ...schemas.core.resources import Resource

    for resource_id_str, resource_data in resources_data.items():
        resource_id = UUID(resource_id_str)
        resource = Resource(
            id=resource_id,
            name=resource_data["name"],
            description=resource_data["description"],
            content=resource_data.get("content"),
            content_type=resource_data.get("content_type", "text/plain"),
        )
        workflow.resources[resource_id] = resource

    # Update workflow-level state
    workflow.total_cost = workflow_snapshot.get("total_cost", 0.0)
    if workflow_snapshot.get("started_at"):
        workflow.started_at = datetime.fromisoformat(
            workflow_snapshot["started_at"]
        )
    if workflow_snapshot.get("completed_at"):
        workflow.completed_at = datetime.fromisoformat(
            workflow_snapshot["completed_at"]
        )
    workflow.is_active = workflow_snapshot.get("is_active", True)