Parameterising the Execute Pipeline activity

Azure Data Factory's Execute Pipeline activity is used to trigger one pipeline from another. It's useful for orchestrating large ETL/ELT workloads because it enables multiple pipelines to be triggered in the right order, in response to a single execution schedule or event.

A shortcoming of the activity is that the pipeline to be triggered must be hard-coded into the activity – so it's impossible to use metadata-driven approaches like iterating over a list of pipeline names. This is why, for example, Paul Andrew's ADF.procfwk uses Azure Functions to trigger pipelines – the Execute Pipeline activity just isn't flexible enough.

Another feature of the activity is that it can only trigger pipelines in the same ADF instance. Dan Perlovsky shows us an alternative approach to doing this using ADF's Web activity – in this post I build on that to create an ADF pipeline that executes another one, identified only by name, and waits for it to complete.

ADF's REST API supports a set of HTTP operations for interacting with a data factory. You can trigger a pipeline in any data factory by calling its pipelines/createrun API endpoint – here I'm going to use ADF's Web activity to call that endpoint for the same factory.

By doing this I'm connecting to the factory from outside of it, so the pipeline I want to trigger must be published. External connections to a data factory instance can only ever access published pipelines – unpublished changes in source control or in your ADF UX session are not accessible via any API connection.

I've created a new pipeline, called “Execute Pipeline” and added a Web activity to the pipeline canvas. The activity is configured like this:

  • URL is the address of the API endpoint – see below

  • The HTTP Method for the pipelines/createrun endpoint is POST

  • Body is mandatory for POST requests – for now I've just set it to an empty JSON object, {}

  • Calling an API endpoint means I'll be connecting to the factory from outside – so the connection needs to be authenticated. For Authentication I'm using the factory's managed service identity (MSI).

  • The factory's MSI will be used to request an access token for the REST API call. Resource indicates the API URI to which I'm requesting access – https://management.azure.com/ is the URI of the Azure Resource Management API.

The pipelines/createrun API endpoint URI looks like this:

https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataFactory/factories/{factoryName}/pipelines/{pipelineName}/createRun?api-version=2018-06-01

I need to replace the four {placeholders} with values for the pipeline I want to call. I'll do this using a variety of parameters and system variables:

  • subscriptionId and resourceGroupName are constant values for my data factory, so I've implemented them as global parameters

  • factoryName is available as an ADF system variable

  • pipelineName identifies the pipeline I want to execute – this is going to be a parameter of the “Execute Pipeline” pipeline.

The resulting ADF expression for the endpoint URI is this:

https://management.azure.com/subscriptions/@{pipeline().globalParameters.SubscriptionId}/resourceGroups/@{pipeline().globalParameters.ResourceGroupName}/providers/Microsoft.DataFactory/factories/@{pipeline().DataFactory}/pipelines/@{pipeline().parameters.PipelineName}/createRun?api-version=2018-06-01

When I call the API endpoint to trigger the pipeline, I specify any parameters it needs in a JSON object – this will be the body of the POST request. This JSON object id also a parameter of the “Execute Pipeline” pipeline – the pipeline now has two String parameters:

  • PipelineName – the name of the pipeline to trigger

  • PipelineParametersJson – a JSON object containing any parameters for the triggered pipeline. This parameter has a default value of {}, so that pipelines without parameters still provide the necessary empty POST body.

Here's the top part of the Web activity's updated Settings configuration tab:

By default, a data factory's managed service identity has no access to the factory – after all, the factory doesn't usually manage its own pipelines externally! To enable the factory to call its own API endpoints, its MSI must be added to the factory's Contributor role in the Azure portal.

ADF's REST API is fire-and-forget – a call to trigger a pipeline run returns immediately after requesting the run. The pipeline is queued and executed as requested, meanwhile the calling application – in this case the “Execute Pipeline” pipeline – moves onto its next step. A common requirement, supported by ADF's Execute Pipeline activity, is to wait for a pipeline to complete its execution before moving on.

Implementing wait for completion has three main components:

  1. wait a bit
  2. check if what we're waiting for has finished yet
  3. repeat steps 1 & 2 until what we're waiting for has finished

This pattern can be implemented using ADF's Until activity.

This isn't the only way to do this. An alternative approach is to trigger the pipeline using the WebHook activity, but this requires the triggered pipeline to implement its own explicit callback on completion. In contrast, the polling-based approach I'm using here makes no demands on the triggered pipeline.

The body of my Until activity looks like this:

  • The Wait activity waits five seconds

  • Another Web activity calls the Pipeline Runs - Get API end point to query the status of the pipeline run

  • A Set variable activity copies the returned pipeline run status into a pipeline variable.

The pipelineruns/get API endpoint URI looks like this:

https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.DataFactory/factories/{factoryName}/pipelineruns/{runId}?api-version=2018-06-01

I build the REST API endpoint URI in the same way as before, using the runId returned by the Web activity that initially triggered the pipeline:

https://management.azure.com/subscriptions/@{pipeline().globalParameters.SubscriptionId}/resourceGroups/@{pipeline().globalParameters.ResourceGroupName}/providers/Microsoft.DataFactory/factories/@{pipeline().DataFactory}/pipelineruns/@{activity('Start pipeline run').output.runId}?api-version=2018-06-01

The pipelineruns/get endpoint uses the HTTP GET method, so the top of the Settings configuration tab looks like this:

The loop terminates when the triggered pipeline has finished executing, i.e. when the RunStatus variable has been updated with a pipeline run status indicating that execution is complete. This is indicated by any of three status values, so I've put them all in an array variable. I now have two pipeline variables:

The terminating condition for the loop (the Expression on the Until activity's Settings tab) is:

@contains(variables('CompletionStatusList'),variables('RunStatus'))

Now the “Execute Pipeline” pipeline will trigger a pipeline and wait for it to finish. This is great, but the pipeline reports success no matter how the triggered pipeline ended – even if it fails, the “Execute Pipeline” pipeline returns success. This isn't consistent with ADF's Execute Pipeline activity.

ADF doesn't have a “Raise error” activity, so I have to find some other way to force an error if the triggered pipeline fails. A common way to do this is to use an Azure SQL DB connection to execute a T-SQL RAISERROR, but I don't want to have to use a SQL connection just for that. Instead, I'm going to force an error by casting a non-integer string to an int.

I do this in a Set variable activity using this ADF expression:

@string(int('not an int'))

The Set variable activity is inside the True branch of an If Condition activity – the condition for execution is that the RunStatus pipeline variable has the value 'Failed'. It doesn't matter which variable I try to set, because the activity is going to fail anyway.

Here's the resulting “Execute Pipeline” pipeline:

Note that this approach allows an error to be propagated, but does not also propagate the underlying error message – the message can be found in the triggered pipeline's execution history.

To use the “Execute Pipeline” pipeline, I call it using the Execute Pipeline activity. The screenshot shows a testing ADF pipeline containing two Execute Pipeline activities, both calling the “Execute Pipeline” pipeline.

  • The activity on the left passes in a PipelineName of “TestWait”. The “TestWait” pipeline takes a “WaitTime” parameter which is specified in the PipelineParametersJson object with a value of 20:

    {"WaitTime":20}
  • The activity on the left passes in a PipelineName of “TestFail”.

As their names suggest, the “TestWait” pipeline waits for a specified duration and the “TestFail” pipeline fails every time. The screenshot shows that the testing pipeline has already been run in Debug mode – the activity calling the “TestFail” pipeline has itself failed, because of the error forced out by the “Execute Pipeline” pipeline.

ADF's Execute Pipeline activity can only execute pipelines in the same data factory and requires hard-coded pipeline names. The “Execute pipeline” pipeline works around these restrictions using the Web activity to trigger pipelines using ADF's REST API, then waits for completion using an Until activity.

If the triggered pipeline fails, the “Execute pipeline” pipeline forces an error so that the calling Execute Pipeline activity also fails. This is consistent with the behaviour of the Execute Pipeline activity, but details of the underlying error are less readily available.

This is a fairly basic implementation of the approach and would benefit from improvement. Some ideas:

  • Don't allow the “Execute pipeline” pipeline to accept a parameter name of 'Execute pipeline'. I haven't tried this because I'm not sure how easy it would be to stop!
  • The Execute Pipeline activity has a boolean “Wait for completion” option – you can choose to use it in fire-and-forget mode or wait-for-completion mode as you wish. The “Execute pipeline” pipeline could be extended to support paramter-based mode selection.
  • The five-second wait used inside the Until activity is hard-coded into the Wait activity. Parameterising the wait duration might also be of interest.
G C F Q D