Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Configuration for a child context.
/// </summary>
/// <remarks>
/// A child context is a logical sub-workflow with its own deterministic
/// operation-ID space, persisted as a <c>CONTEXT</c> operation. Use
/// <see cref="IDurableContext.RunInChildContextAsync{T}(System.Func{IDurableContext, System.Threading.Tasks.Task{T}}, string?, ChildContextConfig?, System.Threading.CancellationToken)"/>
/// (and overloads) to run code inside one.
/// </remarks>
public sealed class ChildContextConfig
{
/// <summary>
/// Operation sub-type label for observability (e.g. <c>"WaitForCallback"</c>).
/// Surfaces on the wire <c>OperationUpdate.SubType</c> field.
/// </summary>
public string? SubType { get; set; }

/// <summary>
/// Optional function to transform exceptions thrown by the child context's
/// user function before they surface to the caller. Useful for wrapping
/// low-level errors into domain-specific exceptions.
/// </summary>
/// <remarks>
/// Applied when the user function throws (the mapped exception propagates
/// to the caller of <c>RunInChildContextAsync</c>) and on replay of a
/// <c>FAILED</c> child context (the constructed
/// <see cref="ChildContextException"/> is mapped before being thrown).
/// </remarks>
public Func<Exception, Exception>? ErrorMapping { get; set; }
}
49 changes: 49 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,55 @@ public Task WaitAsync(
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

public Task<T> RunInChildContextAsync<T>(
Func<IDurableContext, Task<T>> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default)
=> RunChildContext(func, name, config, cancellationToken);

public async Task RunInChildContextAsync(
Func<IDurableContext, Task> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default)
{
// Void child contexts don't carry a meaningful payload; the wrapper
// returns null so the registered ILambdaSerializer is never asked to
// serialize a real value.
await RunChildContext<object?>(
async (ctx) => { await func(ctx); return null; },
name, config, cancellationToken);
}

private Task<T> RunChildContext<T>(
Func<IDurableContext, Task<T>> func,
string? name,
ChildContextConfig? config,
CancellationToken cancellationToken)
{
var serializer = LambdaContext.Serializer
?? throw new InvalidOperationException(
"No ILambdaSerializer is registered on ILambdaContext.Serializer. " +
"Register a serializer via LambdaBootstrapBuilder.Create(handler, serializer) " +
"(or in tests, set TestLambdaContext.Serializer).");

var operationId = _idGenerator.NextId();

// Capture this DurableContext's collaborators; the child shares state,
// termination, batcher, ARN, and Lambda context — but uses a child
// OperationIdGenerator so its operation IDs are deterministically
// namespaced under the parent op ID.
IDurableContext ChildFactory(string parentOpId) => new DurableContext(
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
_durableExecutionArn, LambdaContext, _batcher);

var op = new ChildContextOperation<T>(
operationId, name, func, config, serializer, ChildFactory,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}
}

internal sealed class DurableExecutionContext : IExecutionContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,31 @@ public StepException(string message) : base(message) { }
/// <summary>Creates a <see cref="StepException"/> wrapping an inner exception.</summary>
public StepException(string message, Exception innerException) : base(message, innerException) { }
}

/// <summary>
/// Thrown when a child context's user function fails. Surfaces from
/// <c>RunInChildContextAsync</c>; the underlying error is preserved on the
/// <see cref="ErrorType"/>/<see cref="ErrorData"/>/<see cref="OriginalStackTrace"/>
/// fields. Use <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
/// domain-specific exception.
/// </summary>
public class ChildContextException : DurableExecutionException
{
/// <summary>
/// The child context's <see cref="ChildContextConfig.SubType"/>, if any.
/// </summary>
public string? SubType { get; init; }
/// <summary>The fully-qualified type name of the original exception.</summary>
public string? ErrorType { get; init; }
/// <summary>Optional structured error data attached by the user.</summary>
public string? ErrorData { get; init; }
/// <summary>Stack trace of the original exception, captured before serialization.</summary>
public IReadOnlyList<string>? OriginalStackTrace { get; init; }

/// <summary>Creates an empty <see cref="ChildContextException"/>.</summary>
public ChildContextException() { }
/// <summary>Creates a <see cref="ChildContextException"/> with the given message.</summary>
public ChildContextException(string message) : base(message) { }
/// <summary>Creates a <see cref="ChildContextException"/> wrapping an inner exception.</summary>
public ChildContextException(string message, Exception innerException) : base(message, innerException) { }
}
31 changes: 31 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,37 @@ Task WaitAsync(
TimeSpan duration,
string? name = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Run a user function inside a logical sub-workflow (a "child context").
/// The child has its own deterministic operation-ID space; its result is
/// checkpointed as a <c>CONTEXT</c> operation so subsequent invocations
/// replay the cached value without re-executing the func.
/// </summary>
/// <remarks>
/// Use child contexts to group related durable operations (e.g. a step plus
/// a wait plus a step) into a single observability/error-handling boundary.
/// On failure, surfaces as <see cref="ChildContextException"/>; supply
/// <see cref="ChildContextConfig.ErrorMapping"/> to remap into a
/// domain-specific exception.
/// The child context's return value is serialized to a checkpoint using the
/// <see cref="ILambdaSerializer"/> registered on
/// <see cref="ILambdaContext.Serializer"/>.
/// </remarks>
Task<T> RunInChildContextAsync<T>(
Func<IDurableContext, Task<T>> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Run a user function inside a child context that returns no value.
/// </summary>
Task RunInChildContextAsync(
Func<IDurableContext, Task> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
using System.IO;
using System.Text;
using Amazon.Lambda.Core;
using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;

namespace Amazon.Lambda.DurableExecution.Internal;

/// <summary>
/// Durable child context operation. Runs a user-supplied function inside a
/// nested <see cref="DurableContext"/> with its own deterministic operation-ID
/// space, persisting the function's result so subsequent invocations replay
/// the cached value without re-executing.
/// </summary>
/// <remarks>
/// Replay branches — example: <c>await ctx.RunInChildContextAsync(child =&gt; ..., name: "phase")</c>
/// <list type="bullet">
/// <item><b>Fresh</b>: no prior state → sync-flush CONTEXT START → run user
/// func → on success emit CONTEXT SUCCEED → on failure emit CONTEXT FAIL
/// and throw <see cref="ChildContextException"/>.</item>
/// <item><b>SUCCEEDED</b>: return cached deserialized result; user func is
/// NOT re-executed.</item>
/// <item><b>FAILED</b>: throw <see cref="ChildContextException"/> with the
/// recorded error; if <see cref="ChildContextConfig.ErrorMapping"/> is
/// set, the mapped exception is thrown instead.</item>
/// <item><b>STARTED</b> / <b>PENDING</b>: re-run the user func without
/// re-checkpointing START. The child's own operations recover from their
/// own checkpoints, so this is replay propagation; if a wait/callback
/// inside the child is still pending, the user func re-suspends.</item>
/// </list>
/// Unlike <see cref="StepOperation{T}"/>, child contexts have no retry strategy:
/// failure is terminal and surfaces immediately via
/// <see cref="ChildContextException"/>.
/// </remarks>
internal sealed class ChildContextOperation<T> : DurableOperation<T>
{
private readonly Func<IDurableContext, Task<T>> _func;
private readonly ChildContextConfig? _config;
private readonly ILambdaSerializer _serializer;
private readonly Func<string, IDurableContext> _childContextFactory;

public ChildContextOperation(
string operationId,
string? name,
Func<IDurableContext, Task<T>> func,
ChildContextConfig? config,
ILambdaSerializer serializer,
Func<string, IDurableContext> childContextFactory,
ExecutionState state,
TerminationManager termination,
string durableExecutionArn,
CheckpointBatcher? batcher = null)
: base(operationId, name, state, termination, durableExecutionArn, batcher)
{
_func = func;
_config = config;
_serializer = serializer;
_childContextFactory = childContextFactory;
}

protected override string OperationType => OperationTypes.Context;

protected override async Task<T> StartAsync(CancellationToken cancellationToken)
{
// Sync-flush CONTEXT START before user code so the service has a record
// of the parent context if the inner func suspends (e.g. a Wait inside
// the child terminates the workflow before SUCCEED is reached).
await EnqueueAsync(new SdkOperationUpdate
{
Id = OperationId,
Type = OperationTypes.Context,
Action = "START",
SubType = _config?.SubType,
Name = Name
}, cancellationToken);

return await ExecuteFunc(cancellationToken);
}

protected override Task<T> ReplayAsync(Operation existing, CancellationToken cancellationToken)
{
switch (existing.Status)
{
case OperationStatuses.Succeeded:
// Side-effecting code runs at most once: replay returns the
// cached result without invoking the user func.
return Task.FromResult(DeserializeResult(existing.ContextDetails?.Result));

case OperationStatuses.Failed:
throw MapFailureException(BuildChildContextException(existing));

case OperationStatuses.Started:
case OperationStatuses.Pending:
// Re-run the user func: the child's own operations replay from
// their own checkpoints. Do NOT re-checkpoint START — the
// original is still authoritative. If something inside the
// child is still pending (Wait, callback, retry) the user func
// will re-suspend on its own.
return ExecuteFunc(cancellationToken);

default:
throw new NonDeterministicExecutionException(
$"Child context operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay.");
}
}

private async Task<T> ExecuteFunc(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

var childContext = _childContextFactory(OperationId);

T result;
try
{
result = await _func(childContext);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
await EnqueueAsync(new SdkOperationUpdate
{
Id = OperationId,
Type = OperationTypes.Context,
Action = "FAIL",
SubType = _config?.SubType,
Name = Name,
Error = ToSdkError(ex)
}, cancellationToken);

throw MapFailureException(new ChildContextException(ex.Message, ex)
{
SubType = _config?.SubType,
ErrorType = ex.GetType().FullName
});
}

await EnqueueAsync(new SdkOperationUpdate
{
Id = OperationId,
Type = OperationTypes.Context,
Action = "SUCCEED",
SubType = _config?.SubType,
Name = Name,
Payload = SerializeResult(result)
}, cancellationToken);

return result;
}

private Exception MapFailureException(ChildContextException ex)
{
var mapper = _config?.ErrorMapping;
if (mapper == null) return ex;

var mapped = mapper(ex);
return mapped ?? ex;
}

private ChildContextException BuildChildContextException(Operation failedOp)
{
var err = failedOp.ContextDetails?.Error;
return new ChildContextException(err?.ErrorMessage ?? "Child context failed")
{
SubType = failedOp.SubType ?? _config?.SubType,
ErrorType = err?.ErrorType,
ErrorData = err?.ErrorData,
OriginalStackTrace = err?.StackTrace
};
}

private T DeserializeResult(string? serialized)
{
if (serialized == null) return default!;
var bytes = Encoding.UTF8.GetBytes(serialized);
using var ms = new MemoryStream(bytes);
return _serializer.Deserialize<T>(ms);
}

private string SerializeResult(T value)
{
using var ms = new MemoryStream();
_serializer.Serialize(value, ms);
return Encoding.UTF8.GetString(ms.ToArray());
}

private static SdkErrorObject ToSdkError(Exception ex) => new()
{
ErrorType = ex.GetType().FullName,
ErrorMessage = ex.Message,
StackTrace = ex.StackTrace?.Split(new[] { '\r', '\n' }, StringSplitOptions.RemoveEmptyEntries).ToList()
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ private static Internal.Operation MapFromSdkOperation(SdkOperation sdkOp)
ExecutionDetails = sdkOp.ExecutionDetails != null ? new Internal.ExecutionDetails
{
InputPayload = sdkOp.ExecutionDetails.InputPayload
} : null,
ContextDetails = sdkOp.ContextDetails != null ? new Internal.ContextDetails
{
Result = sdkOp.ContextDetails.Result,
Error = sdkOp.ContextDetails.Error != null ? new ErrorObject
{
ErrorType = sdkOp.ContextDetails.Error.ErrorType,
ErrorMessage = sdkOp.ContextDetails.Error.ErrorMessage
} : null
} : null
Comment on lines +132 to 140
};
}
Expand Down
Loading
Loading