State & Checkpoints Done
Ensure Jordan's triage assembly line never loses data by managing shared state and checkpointing execution.
ℹ️ Note on Scope
This module is intended to provide a conceptual overview and ensure completeness of the orchestration pillars. A detailed, hands-on walkthrough of state management and checkpointing implementation is part of a separate, specialized learning path.
Overview
Jordan Miller’s triage workflow is highly effective, but Taylor Vance has a new concern: “What happens if the service restarts while the agent is reasoning? Or what if multiple executors need access to the same giant telemetry file without passing it through every single edge?”
In real-world applications, managing data and recovering from interruptions are critical. Without proper state isolation, data can leak between different workflow executions. Without checkpoints, a crashed process means starting the triage all over again.
In this module, we will implement Shared State and Checkpoints to make Jordan’s workflow resilient and production-ready.
System Anatomy
Graph execution.
Processing units.
Message routing.
Observability.
Resiliency.
Service boundary.
Managing Workflow State
State allows multiple executors within a workflow to access and modify common data without relying on direct message passing. This is essential when passing massive payloads (like a 50MB log file) between executors is inefficient.
🛡️ State Isolation
When building workflows, it’s critical to ensure State Isolation. If you reuse the same WorkflowBuilder or Executor instances across multiple requests, their mutable internal state will be shared, leading to data corruption. Always create a new workflow instance (and new executors) for each request, or ensure your executors implement IResettableExecutor.
Writing and Reading State
Executors can safely interact with the shared workflow context using QueueStateUpdateAsync and ReadStateAsync.
internal sealed class FetchTelemetryExecutor() : Executor<string, string>("FetchTelemetry")
{
public override async ValueTask<string> HandleAsync(string service, IWorkflowContext context, CancellationToken ct = default)
{
string rawLogs = "[Massive log payload...]";
string logId = Guid.NewGuid().ToString("N");
// Store the large payload in shared state instead of passing it downstream
await context.QueueStateUpdateAsync(logId, rawLogs, scopeName: "IncidentLogs", ct);
return logId; // Only pass the ID to the next executor
}
}
internal sealed class TriageAgentExecutor() : Executor<string, string>("TriageAgent")
{
public override async ValueTask<string> HandleAsync(string logId, IWorkflowContext context, CancellationToken ct = default)
{
// Retrieve the payload using the ID
var rawLogs = await context.ReadStateAsync<string>(logId, scopeName: "IncidentLogs", ct);
return "PRIORITY: P0";
}
}
Checkpointing Execution
Checkpoints allow you to save the state of a workflow at specific points during its execution and resume from those points later. This is vital for long-running workflows to avoid losing progress in case of failures.
Checkpoints are created at the end of each superstep (after all executors in that step finish). A checkpoint captures:
- The current state of all executors
- Pending messages for the next superstep
- Shared states
1 Setup the Checkpoint Manager
To enable checkpointing, you must provide a CheckpointManager when running the workflow. The Agent Framework provides multiple storage providers depending on your durability needs:
- In-Memory: Keeps checkpoints in process memory (best for local testing).
- File: Persists checkpoints to a local directory (survives process restarts).
- Azure Cosmos DB: Persists to NoSQL (best for production/distributed systems).
Here is how Jordan updates the Program.cs execution to use an in-memory checkpoint manager:
using Microsoft.Agents.AI.Workflows;
// 1. Create a checkpoint manager
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// 2. Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
.RunStreamingAsync(workflow, "checkout failures", checkpointManager);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is SuperStepCompletedEvent stepCompleted)
{
// A checkpoint is successfully saved after each superstep
CheckpointInfo? cp = stepCompleted.CompletionInfo?.Checkpoint;
ConsoleTheme.Muted($"[CHECKPOINT] Step complete. Saved state: {cp?.CheckpointId}");
}
}
2 Making Executors Checkpoint-Aware
To ensure that an executor’s custom internal state is saved, it must override OnCheckpointingAsync. To restore that state, it overrides OnCheckpointRestoredAsync.
internal sealed partial class TriageSyncExecutor() : Executor<string, string>("TriageSync")
{
private const string StateKey = "TriageSyncState";
private int _messagesProcessed = 0;
public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken ct = default)
{
_messagesProcessed++;
return ValueTask.FromResult("Processed");
}
// Save custom state to the checkpoint
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken ct = default)
{
return context.QueueStateUpdateAsync(StateKey, _messagesProcessed, ct: ct);
}
// Restore custom state from the checkpoint
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken ct = default)
{
_messagesProcessed = await context.ReadStateAsync<int>(StateKey, ct: ct);
}
}
Resuming and Rehydrating
If Taylor’s server crashes during an incident, they don’t want to re-fetch telemetry. They want to resume from the exact last saved superstep.
Resuming on the Same Instance
You can resume a paused workflow directly if you still have the StreamingRun object in memory:
// Get a previous checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints.Last();
// Restore and continue execution
await run.RestoreCheckpointAsync(savedCheckpoint);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
// ... handling resumed events
}
Rehydrating into a New Instance
If the server restarted, the workflow object is gone, but the checkpoint data persists (if using File or Cosmos DB storage). You can rehydrate the workflow entirely:
// In a new process/server...
// (Assuming a FileCheckpointStorage implementation exists or similar approach)
// CheckpointManager manager = CheckpointManager.CreateFileStorage("/path/to/checkpoints");
// CheckpointInfo lastCp = await manager.GetLatestCheckpointAsync("IncidentTriage");
// Re-build the workflow topology
// Workflow newWorkflow = builder.Build();
// Resume execution from the loaded checkpoint
// StreamingRun resumedRun = await InProcessExecution
// .ResumeStreamingAsync(newWorkflow, lastCp, manager);
⚠️ Security Reminder
Checkpoint storage is a trust boundary. Never load checkpoints from untrusted or potentially tampered sources. Ensure the storage location (File or Cosmos DB) is secured with appropriate access controls.
Summary & Next Steps
You’ve shown Jordan and Taylor how to build a resilient, stateful automation system. By leveraging Shared State for heavy payloads and Checkpoints for fault tolerance, the triage workflow is now robust enough for production.
In the next tutorial, we will take this resilient assembly line and host it as a scalable API using Azure Functions.