Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ private static async Task Main()
var checkpoints = new List<CheckpointInfo>();

// Execute the workflow and save checkpoints
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
await using StreamingRun checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager);

await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent executorCompletedEvt)
{
Expand Down Expand Up @@ -72,10 +72,10 @@ private static async Task Main()
Console.WriteLine($"\n\nHydrating a new workflow instance from the {CheckpointIndex + 1}th checkpoint.");
CheckpointInfo savedCheckpoint = checkpoints[CheckpointIndex];

await using Checkpointed<StreamingRun> newCheckpointedRun =
await using StreamingRun newCheckpointedRun =
await InProcessExecution.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager);

await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync())
await foreach (WorkflowEvent evt in newCheckpointedRun.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent executorCompletedEvt)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ private static async Task Main()
var checkpoints = new List<CheckpointInfo>();

// Execute the workflow and save checkpoints
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
await using StreamingRun checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager)
;
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent executorCompletedEvt)
{
Expand Down Expand Up @@ -71,7 +71,7 @@ private static async Task Main()
CheckpointInfo savedCheckpoint = checkpoints[CheckpointIndex];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync())
{
if (evt is ExecutorCompletedEvent executorCompletedEvt)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ private static async Task Main()
var checkpoints = new List<CheckpointInfo>();

// Execute the workflow and save checkpoints
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
await using StreamingRun checkpointedRun = await InProcessExecution
.StreamAsync(workflow, new SignalWithNumber(NumberSignal.Init), checkpointManager)
;
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await checkpointedRun.Run.SendResponseAsync(response);
await checkpointedRun.SendResponseAsync(response);
break;
case ExecutorCompletedEvent executorCompletedEvt:
Console.WriteLine($"* Executor {executorCompletedEvt.ExecutorId} completed.");
Expand Down Expand Up @@ -77,14 +77,14 @@ private static async Task Main()
CheckpointInfo savedCheckpoint = checkpoints[CheckpointIndex];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await checkpointedRun.Run.SendResponseAsync(response);
await checkpointedRun.SendResponseAsync(response);
break;
case ExecutorCompletedEvent executorCompletedEvt:
Console.WriteLine($"* Executor {executorCompletedEvt.ExecutorId} completed.");
Expand Down
4 changes: 4 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/CheckpointManager.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Checkpointing;
Expand Down Expand Up @@ -54,4 +55,7 @@ ValueTask<CheckpointInfo> ICheckpointManager.CommitCheckpointAsync(string runId,

ValueTask<Checkpoint> ICheckpointManager.LookupCheckpointAsync(string runId, CheckpointInfo checkpointInfo)
=> this._impl.LookupCheckpointAsync(runId, checkpointInfo);

ValueTask<IEnumerable<CheckpointInfo>> ICheckpointManager.RetrieveIndexAsync(string runId, CheckpointInfo? withParent)
=> this._impl.RetrieveIndexAsync(runId, withParent);
}
49 changes: 49 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/CheckpointableRunBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Checkpointing;

namespace Microsoft.Agents.AI.Workflows;

/// <summary>
/// Represents a base object for a workflow run that may support checkpointing.
/// </summary>
public abstract class CheckpointableRunBase
{
// TODO: Rename Context?
private readonly ICheckpointingHandle _checkpointingHandle;

internal CheckpointableRunBase(ICheckpointingHandle checkpointingHandle)
{
this._checkpointingHandle = checkpointingHandle;
}

/// <inheritdoc cref="ICheckpointingHandle.IsCheckpointingEnabled"/>
public bool IsCheckpointingEnabled => this._checkpointingHandle.IsCheckpointingEnabled;

/// <inheritdoc cref="ICheckpointingHandle.Checkpoints"/>
public IReadOnlyList<CheckpointInfo> Checkpoints => this._checkpointingHandle.Checkpoints ?? [];

/// <summary>
/// Gets the most recent checkpoint information.
/// </summary>
public CheckpointInfo? LastCheckpoint
{
get
{
if (!this.IsCheckpointingEnabled)
{
return null;
}

var checkpoints = this.Checkpoints;
return checkpoints.Count > 0 ? checkpoints[checkpoints.Count - 1] : null;
}
}

/// <inheritdoc cref="ICheckpointingHandle.RestoreCheckpointAsync"/>
public ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default)
=> this._checkpointingHandle.RestoreCheckpointAsync(checkpointInfo, cancellationToken);
}
66 changes: 0 additions & 66 deletions dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointed.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Threading.Tasks;

namespace Microsoft.Agents.AI.Workflows.Checkpointing;
Expand Down Expand Up @@ -27,4 +28,7 @@ public async ValueTask<Checkpoint> LookupCheckpointAsync(string runId, Checkpoin
TStoreObject result = await this._store.RetrieveCheckpointAsync(runId, checkpointInfo).ConfigureAwait(false);
return this._marshaller.Marshal<Checkpoint>(result);
}

public ValueTask<IEnumerable<CheckpointInfo>> RetrieveIndexAsync(string runId, CheckpointInfo? withParent = null)
=> this._store.RetrieveIndexAsync(runId, withParent);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,16 @@ internal interface ICheckpointManager
/// cref="Checkpoint"/> associated with the specified <paramref name="checkpointInfo"/>.</returns>
/// <exception cref="KeyNotFoundException">Thrown if the checkpoint is not found.</exception>
ValueTask<Checkpoint> LookupCheckpointAsync(string runId, CheckpointInfo checkpointInfo);

/// <summary>
/// Asynchronously retrieves the collection of checkpoint information for the specified run identifier, optionally
/// filtered by a parent checkpoint.
/// </summary>
/// <param name="runId">The unique identifier of the run for which to retrieve checkpoint information. Cannot be null or empty.</param>
/// <param name="withParent">An optional parent checkpoint to filter the results. If specified, only checkpoints with the given parent are
/// returned; otherwise, all checkpoints for the run are included.</param>
/// <returns>A value task representing the asynchronous operation. The result contains a collection of <see
/// cref="CheckpointInfo"/> objects associated with the specified run. The collection is empty if no checkpoints are
/// found.</returns>
ValueTask<IEnumerable<CheckpointInfo>> RetrieveIndexAsync(string runId, CheckpointInfo? withParent = null);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,21 @@ namespace Microsoft.Agents.AI.Workflows.Checkpointing;

internal interface ICheckpointingHandle
{
// TODO: Convert this to a multi-timeline (e.g.: Live timeline + forks for orphaned checkpoints due to timetravel)
/// <summary>
/// Gets a value indicating whether checkpointing is enabled for the current operation or process.
/// </summary>
bool IsCheckpointingEnabled { get; }

/// <summary>
/// Gets a read-only list of checkpoint information associated with the current context.
/// </summary>
IReadOnlyList<CheckpointInfo> Checkpoints { get; }

/// <summary>
/// Restores the system state from the specified checkpoint asynchronously.
/// </summary>
/// <param name="checkpointInfo">The checkpoint information that identifies the state to restore. Cannot be null.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the restore operation.</param>
/// <returns>A <see cref="ValueTask"/> that represents the asynchronous restore operation.</returns>
ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,7 @@ public ValueTask<Checkpoint> LookupCheckpointAsync(string runId, CheckpointInfo

public bool TryGetLastCheckpoint(string runId, [NotNullWhen(true)] out CheckpointInfo? checkpoint)
=> this.GetRunStore(runId).TryGetLastCheckpointInfo(out checkpoint);

public ValueTask<IEnumerable<CheckpointInfo>> RetrieveIndexAsync(string runId, CheckpointInfo? withParent = null)
=> new(this.GetRunStore(runId).CheckpointIndex.AsReadOnly());
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ internal AsyncRunHandle(ISuperStepRunner stepRunner, ICheckpointingHandle checkp

public string RunId => this._stepRunner.RunId;

public bool IsCheckpointingEnabled => this._checkpointingHandle.IsCheckpointingEnabled;

public IReadOnlyList<CheckpointInfo> Checkpoints => this._checkpointingHandle.Checkpoints;

public ValueTask<RunStatus> GetStatusAsync(CancellationToken cancellationToken = default)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Agents.AI.Workflows.Execution;

internal static class AsyncRunHandleExtensions
{
public static async ValueTask<Checkpointed<TRunType>> WithCheckpointingAsync<TRunType>(this AsyncRunHandle runHandle, Func<ValueTask<TRunType>> prepareFunc)
{
TRunType run = await prepareFunc().ConfigureAwait(false);
return new Checkpointed<TRunType>(run, runHandle);
}

public static async ValueTask<StreamingRun> EnqueueAndStreamAsync<TInput>(this AsyncRunHandle runHandle, TInput input, CancellationToken cancellationToken = default)
{
await runHandle.EnqueueMessageAsync(input, cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Microsoft.Agents.AI.Workflows.Execution;

internal interface ISuperStepJoinContext
{
bool WithCheckpointing { get; }
bool IsCheckpointingEnabled { get; }
bool ConcurrentRunsEnabled { get; }

ValueTask ForwardWorkflowEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default);
Expand Down
Loading
Loading