< Summary

Information
Class: Elsa.Http.Middleware.HttpWorkflowsMiddleware
Assembly: Elsa.Http
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Http/Middleware/HttpWorkflowsMiddleware.cs
Line coverage
67%
Covered lines: 135
Uncovered lines: 65
Coverable lines: 200
Total lines: 388
Line coverage: 67.5%
Branch coverage
67%
Covered branches: 35
Total branches: 52
Branch coverage: 67.3%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
InvokeAsync()70.83%252487.23%
FindWorkflowGraphAsync()100%210%
FindTriggersAsync()100%210%
FindBookmarksAsync()100%11100%
StartWorkflowAsync()100%11100%
ResumeWorkflowAsync()50%6682.6%
ExecuteWorkflowAsync()50%2281.81%
<ExecuteWorkflowAsync()100%22100%
ExecuteWithinTimeoutAsync()50%4227.27%
GetMatchingRoute(...)100%44100%
GetCorrelationIdAsync()100%44100%
GetWorkflowInstanceIdAsync()100%44100%
WriteResponseAsync()0%620%
HandleMultipleWorkflowsFoundAsync()100%210%
HandleWorkflowFaultAsync()50%8437.5%
AuthorizeAsync()50%2280%
ComputeBookmarkHash(...)100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Http/Middleware/HttpWorkflowsMiddleware.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Http.Bookmarks;
 3using Elsa.Http.Options;
 4using Elsa.Workflows.Runtime.Filters;
 5using JetBrains.Annotations;
 6using Microsoft.AspNetCore.Http;
 7using Microsoft.Extensions.DependencyInjection;
 8using Microsoft.Extensions.Options;
 9using System.Net;
 10using System.Net.Mime;
 11using System.Text.Json;
 12using Elsa.Workflows.Activities;
 13using Elsa.Workflows.Runtime.Entities;
 14using FastEndpoints;
 15using System.Diagnostics.CodeAnalysis;
 16using Elsa.Workflows;
 17using Elsa.Workflows.Management;
 18using Elsa.Workflows.Management.Entities;
 19using Elsa.Workflows.Models;
 20using Elsa.Workflows.Options;
 21using Elsa.Workflows.Runtime;
 22using Open.Linq.AsyncExtensions;
 23
 24namespace Elsa.Http.Middleware;
 25
 26/// <summary>
 27/// An ASP.NET middleware component that tries to match the inbound request path to an associated workflow and then run 
 28/// </summary>
 29[PublicAPI]
 430public class HttpWorkflowsMiddleware(RequestDelegate next)
 31{
 32    /// <summary>
 33    /// Attempts to match the inbound request path to an associated workflow and then run that workflow.
 34    /// </summary>
 35    [RequiresUnreferencedCode("Calls System.Text.Json.JsonSerializer.Serialize<TValue>(TValue, JsonSerializerOptions)")]
 36    public async Task InvokeAsync(
 37        HttpContext httpContext,
 38        IServiceProvider serviceProvider,
 39        IOptions<HttpActivityOptions> options,
 40        IHttpWorkflowLookupService httpWorkflowLookupService)
 41    {
 29642        var path = httpContext.Request.Path.Value!.NormalizeRoute();
 29643        var matchingPath = GetMatchingRoute(serviceProvider, path).Route;
 29644        var basePath = options.Value.BasePath?.ToString().NormalizeRoute();
 45
 46        // If the request path does not match the configured base path to handle workflows, then skip.
 29647        if (!string.IsNullOrWhiteSpace(basePath))
 48        {
 29549            if (!path.StartsWith(basePath, StringComparison.OrdinalIgnoreCase))
 50            {
 3051                await next(httpContext);
 3052                return;
 53            }
 54
 55            // Strip the base path.
 26556            matchingPath = matchingPath[basePath.Length..];
 57        }
 58
 59        // Graceful-shutdown gate: when the runtime is paused or draining, we don't accept new HTTP-triggered work.
 60        // The ingress source registry visibility is provided by HttpTriggerIngressSource — this is the actual mechanism
 26661        var quiescenceSignal = serviceProvider.GetService<IQuiescenceSignal>();
 26662        if (quiescenceSignal is not null && !quiescenceSignal.IsAcceptingNewWork)
 63        {
 064            httpContext.Response.StatusCode = (int)HttpStatusCode.ServiceUnavailable;
 65            // Retry-After is reason-aware: drain is short (host is exiting and will be replaced shortly), but an
 66            // administrative pause is indefinite, so a longer back-off avoids a tight retry loop while operators
 67            // perform maintenance.
 068            httpContext.Response.Headers.RetryAfter = quiescenceSignal.CurrentState.Reason.HasFlag(QuiescenceReason.Drai
 069            return;
 70        }
 71
 26672        matchingPath = matchingPath.NormalizeRoute();
 73
 26674        var input = new Dictionary<string, object>
 26675        {
 26676            [HttpEndpoint.HttpContextInputKey] = true,
 26677            [HttpEndpoint.PathInputKey] = path
 26678        };
 79
 26680        var cancellationToken = httpContext.RequestAborted;
 26681        var request = httpContext.Request;
 26682        var method = request.Method.ToLowerInvariant();
 26683        var workflowInstanceId = await GetWorkflowInstanceIdAsync(serviceProvider, httpContext, cancellationToken);
 26684        var correlationId = await GetCorrelationIdAsync(serviceProvider, httpContext, cancellationToken);
 26685        var bookmarkHash = ComputeBookmarkHash(serviceProvider, matchingPath, method);
 26686        var lookupResult = await httpWorkflowLookupService.FindWorkflowAsync(bookmarkHash, cancellationToken);
 87
 26688        if (lookupResult != null)
 89        {
 25890            var triggers = lookupResult.Triggers;
 91
 25892            if (triggers.Count > 1)
 93            {
 094                await HandleMultipleWorkflowsFoundAsync(httpContext, () => triggers.Select(x => new
 095                {
 096                    x.WorkflowDefinitionId
 097                }), cancellationToken);
 098                return;
 99            }
 100
 258101            var trigger = triggers.FirstOrDefault();
 258102            if (trigger != null)
 103            {
 258104                var workflowGraph = lookupResult.WorkflowGraph!;
 258105                await StartWorkflowAsync(httpContext, trigger, workflowGraph, input, workflowInstanceId, correlationId);
 258106                return;
 107            }
 108        }
 109
 8110        var bookmarks = await FindBookmarksAsync(serviceProvider, bookmarkHash, workflowInstanceId, correlationId, cance
 111
 8112        if (bookmarks.Count > 1)
 113        {
 0114            await HandleMultipleWorkflowsFoundAsync(httpContext, () => bookmarks.Select(x => new
 0115            {
 0116                x.WorkflowInstanceId
 0117            }), cancellationToken);
 0118            return;
 119        }
 120
 8121        var bookmark = bookmarks.SingleOrDefault();
 122
 8123        if (bookmark != null)
 124        {
 2125            await ResumeWorkflowAsync(httpContext, bookmark, input, correlationId);
 2126            return;
 127        }
 128
 129        // If a base path was configured, the requester tried to execute a workflow that doesn't exist.
 6130        if (basePath != null)
 131        {
 5132            await httpContext.Response.SendNotFoundAsync(cancellation: cancellationToken);
 5133            return;
 134        }
 135
 136        // If no base path was configured, the request should be handled by subsequent middlewares.
 1137        await next(httpContext);
 296138    }
 139
 140    private async Task<WorkflowGraph?> FindWorkflowGraphAsync(IServiceProvider serviceProvider, StoredTrigger trigger, C
 141    {
 0142        var workflowDefinitionService = serviceProvider.GetRequiredService<IWorkflowDefinitionService>();
 0143        var workflowDefinitionId = trigger.WorkflowDefinitionVersionId;
 0144        return await workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionId, cancellationToken);
 0145    }
 146
 147    private async Task<IEnumerable<StoredTrigger>> FindTriggersAsync(IServiceProvider serviceProvider, string bookmarkHa
 148    {
 0149        var triggerStore = serviceProvider.GetRequiredService<ITriggerStore>();
 0150        var triggerFilter = new TriggerFilter
 0151        {
 0152            Hash = bookmarkHash
 0153        };
 0154        return await triggerStore.FindManyAsync(triggerFilter, cancellationToken);
 0155    }
 156
 157    private async Task<IEnumerable<StoredBookmark>> FindBookmarksAsync(IServiceProvider serviceProvider, string bookmark
 158    {
 8159        var bookmarkStore = serviceProvider.GetRequiredService<IBookmarkStore>();
 8160        var bookmarkFilter = new BookmarkFilter
 8161        {
 8162            Hash = bookmarkHash,
 8163            WorkflowInstanceId = workflowInstanceId,
 8164            CorrelationId = correlationId
 8165        };
 8166        return await bookmarkStore.FindManyAsync(bookmarkFilter, cancellationToken);
 8167    }
 168
 169    private async Task StartWorkflowAsync(HttpContext httpContext, StoredTrigger trigger, WorkflowGraph workflowGraph, I
 170    {
 258171        var bookmarkPayload = trigger.GetPayload<HttpEndpointBookmarkPayload>();
 258172        var workflowOptions = new RunWorkflowOptions
 258173        {
 258174            Input = input,
 258175            CorrelationId = correlationId,
 258176            TriggerActivityId = trigger.ActivityId,
 258177            WorkflowInstanceId = workflowInstanceId
 258178        };
 179
 258180        await ExecuteWorkflowAsync(httpContext, workflowGraph, workflowOptions, bookmarkPayload, null, input);
 258181    }
 182
 183    private async Task ResumeWorkflowAsync(HttpContext httpContext, StoredBookmark bookmark, IDictionary<string, object>
 184    {
 2185        var serviceProvider = httpContext.RequestServices;
 2186        var cancellationToken = httpContext.RequestAborted;
 2187        var bookmarkPayload = bookmark.GetPayload<HttpEndpointBookmarkPayload>();
 2188        var workflowInstanceStore = serviceProvider.GetRequiredService<IWorkflowInstanceStore>();
 2189        var workflowInstance = await workflowInstanceStore.FindAsync(bookmark.WorkflowInstanceId, cancellationToken);
 190
 2191        if (workflowInstance == null)
 192        {
 0193            await httpContext.Response.SendNotFoundAsync(cancellation: cancellationToken);
 0194            return;
 195        }
 196
 2197        var workflowDefinitionService = serviceProvider.GetRequiredService<IWorkflowDefinitionService>();
 2198        var workflowGraph = await workflowDefinitionService.FindWorkflowGraphAsync(workflowInstance.DefinitionVersionId,
 199
 2200        if (workflowGraph == null)
 201        {
 0202            await httpContext.Response.SendNotFoundAsync(cancellation: cancellationToken);
 0203            return;
 204        }
 205
 2206        var runWorkflowParams = new RunWorkflowOptions
 2207        {
 2208            WorkflowInstanceId = workflowInstance.Id,
 2209            Input = input,
 2210            CorrelationId = correlationId,
 2211            ActivityHandle = bookmark.ActivityInstanceId != null ? ActivityHandle.FromActivityInstanceId(bookmark.Activi
 2212            BookmarkId = bookmark.Id
 2213        };
 214
 2215        await ExecuteWorkflowAsync(httpContext, workflowGraph, runWorkflowParams, bookmarkPayload, workflowInstance, inp
 2216    }
 217
 218    private async Task ExecuteWorkflowAsync(HttpContext httpContext, WorkflowGraph workflowGraph, RunWorkflowOptions wor
 219    {
 260220        var serviceProvider = httpContext.RequestServices;
 260221        var cancellationToken = httpContext.RequestAborted;
 260222        var workflow = workflowGraph.Workflow;
 223
 260224        if (!await AuthorizeAsync(serviceProvider, httpContext, workflow, bookmarkPayload, cancellationToken))
 225        {
 0226            httpContext.Response.StatusCode = (int)HttpStatusCode.Unauthorized;
 0227            return;
 228        }
 229
 260230        var workflowRunner = serviceProvider.GetRequiredService<IWorkflowRunner>();
 260231        var result = await ExecuteWithinTimeoutAsync(async ct =>
 260232        {
 260233            if (workflowInstance == null)
 258234                return await workflowRunner.RunAsync(workflowGraph, workflowOptions, ct);
 2235            return await workflowRunner.RunAsync(workflow, workflowInstance.WorkflowState, workflowOptions, ct);
 520236        }, bookmarkPayload.RequestTimeout, httpContext);
 260237        await HandleWorkflowFaultAsync(serviceProvider, httpContext, result, cancellationToken);
 260238    }
 239
 240    private async Task<T> ExecuteWithinTimeoutAsync<T>(Func<CancellationToken, Task<T>> action, TimeSpan? requestTimeout
 241    {
 242        // If no request timeout is specified, execute the action without any timeout.
 260243        if (requestTimeout == null)
 260244            return await action(httpContext.RequestAborted);
 245
 246        // Create a combined cancellation token that cancels when the request is aborted or when the request timeout is 
 0247        using var requestTimeoutCancellationTokenSource = new CancellationTokenSource();
 0248        requestTimeoutCancellationTokenSource.CancelAfter(requestTimeout.Value);
 0249        using var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(httpContext.RequestAborted, requ
 0250        var originalCancellationToken = httpContext.RequestAborted;
 251
 252        // Replace the original cancellation token with the combined one.
 0253        httpContext.RequestAborted = combinedTokenSource.Token;
 254
 255        // Execute the action.
 0256        var result = await action(httpContext.RequestAborted);
 257
 258        // Restore the original cancellation token.
 0259        httpContext.RequestAborted = originalCancellationToken;
 260
 0261        return result;
 260262    }
 263
 264    private HttpRouteData GetMatchingRoute(IServiceProvider serviceProvider, string path)
 265    {
 296266        var routeMatcher = serviceProvider.GetRequiredService<IRouteMatcher>();
 296267        var routeTable = serviceProvider.GetRequiredService<IRouteTable>();
 268
 296269        var matchingRouteQuery =
 296270            from routeData in routeTable
 3491271            let routeValues = routeMatcher.Match(routeData.Route, path)
 3491272            where routeValues != null
 560273            select new
 560274            {
 560275                route = routeData,
 560276                routeValues
 560277            };
 278
 296279        var matchingRoute = matchingRouteQuery.FirstOrDefault();
 296280        var routeTemplate = matchingRoute?.route ?? new HttpRouteData(path);
 281
 296282        return routeTemplate;
 283    }
 284
 285    private async Task<string?> GetCorrelationIdAsync(IServiceProvider serviceProvider, HttpContext httpContext, Cancell
 286    {
 266287        var correlationIdSelectors = serviceProvider.GetServices<IHttpCorrelationIdSelector>();
 288
 266289        var correlationId = default(string);
 290
 2118291        foreach (var selector in correlationIdSelectors.OrderByDescending(x => x.Priority))
 292        {
 530293            correlationId = await selector.GetCorrelationIdAsync(httpContext, cancellationToken);
 294
 530295            if (correlationId != null)
 296                break;
 297        }
 298
 266299        return correlationId;
 266300    }
 301
 302    private async Task<string?> GetWorkflowInstanceIdAsync(IServiceProvider serviceProvider, HttpContext httpContext, Ca
 303    {
 266304        var workflowInstanceIdSelectors = serviceProvider.GetServices<IHttpWorkflowInstanceIdSelector>();
 305
 266306        var workflowInstanceId = default(string);
 307
 2118308        foreach (var selector in workflowInstanceIdSelectors.OrderByDescending(x => x.Priority))
 309        {
 530310            workflowInstanceId = await selector.GetWorkflowInstanceIdAsync(httpContext, cancellationToken);
 311
 530312            if (workflowInstanceId != null)
 313                break;
 314        }
 315
 266316        return workflowInstanceId;
 266317    }
 318
 319    [RequiresUnreferencedCode("Calls System.Text.Json.JsonSerializer.Serialize<TValue>(TValue, JsonSerializerOptions)")]
 320    private static async Task WriteResponseAsync(HttpContext httpContext, CancellationToken cancellationToken)
 321    {
 0322        var response = httpContext.Response;
 323
 0324        if (!response.HasStarted)
 325        {
 0326            response.ContentType = MediaTypeNames.Application.Json;
 0327            response.StatusCode = StatusCodes.Status200OK;
 328
 0329            var model = new
 0330            {
 0331                workflowInstanceIds = Array.Empty<string>(),
 0332            };
 333
 0334            var json = JsonSerializer.Serialize(model);
 0335            await response.WriteAsync(json, cancellationToken);
 336        }
 0337    }
 338
 339    [RequiresUnreferencedCode("Calls System.Text.Json.JsonSerializer.Serialize<TValue>(TValue, JsonSerializerOptions)")]
 340    private async Task<bool> HandleMultipleWorkflowsFoundAsync(HttpContext httpContext, Func<IEnumerable<object>> workfl
 341    {
 0342        httpContext.Response.ContentType = "application/json";
 0343        httpContext.Response.StatusCode = (int)HttpStatusCode.InternalServerError;
 344
 0345        var responseContent = JsonSerializer.Serialize(new
 0346        {
 0347            errorMessage = "The call is ambiguous and matches multiple workflows.",
 0348            workflows = workflowMatches().ToArray()
 0349        });
 350
 0351        await httpContext.Response.WriteAsync(responseContent, cancellationToken);
 0352        return true;
 0353    }
 354
 355    private async Task<bool> HandleWorkflowFaultAsync(IServiceProvider serviceProvider, HttpContext httpContext, RunWork
 356    {
 260357        if (!workflowExecutionResult.WorkflowState.Incidents.Any() || httpContext.Response.HasStarted)
 260358            return false;
 359
 0360        var httpEndpointFaultHandler = serviceProvider.GetRequiredService<IHttpEndpointFaultHandler>();
 0361        var workflowInstanceManager = serviceProvider.GetRequiredService<IWorkflowInstanceManager>();
 0362        var workflowState = (await workflowInstanceManager.FindByIdAsync(workflowExecutionResult.WorkflowState.Id, cance
 0363        await httpEndpointFaultHandler.HandleAsync(new(httpContext, workflowState.WorkflowState, cancellationToken));
 0364        return true;
 260365    }
 366
 367    private async Task<bool> AuthorizeAsync(
 368        IServiceProvider serviceProvider,
 369        HttpContext httpContext,
 370        Workflow workflow,
 371        HttpEndpointBookmarkPayload bookmarkPayload,
 372        CancellationToken cancellationToken)
 373    {
 260374        var httpEndpointAuthorizationHandler = serviceProvider.GetRequiredService<IHttpEndpointAuthorizationHandler>();
 375
 260376        if (bookmarkPayload.Authorize == false)
 260377            return true;
 378
 0379        return await httpEndpointAuthorizationHandler.AuthorizeAsync(new(httpContext, workflow, bookmarkPayload.Policy))
 260380    }
 381
 382    private string ComputeBookmarkHash(IServiceProvider serviceProvider, string path, string method)
 383    {
 266384        var bookmarkPayload = new HttpEndpointBookmarkPayload(path, method);
 266385        var bookmarkHasher = serviceProvider.GetRequiredService<IStimulusHasher>();
 266386        return bookmarkHasher.Hash(HttpStimulusNames.HttpEndpoint, bookmarkPayload);
 387    }
 388}