Unit testing Azure Data Factory pipelines

This is part five of my series about automated testing for Azure Data Factory (ADF) pipelines. If this is the first part you've read, you may prefer to start at the beginning.

In part three of this series I looked at functional tests for ADF pipelines: verifying, in isolation, that pipelines are “doing things right”. In this post I'll be testing isolated pipelines to check that they're “doing the right things” – this is one description of a unit test. In a general-purpose programming language, unit tests might be used to verify that an individual line of code is executed, or that it has a particular effect. In Azure Data Factory, the smallest unit of development – a “line of code” – is a pipeline activity. I will be writing tests to verify that specific activities are executed (or not) and to inspect their results.

The ADF pipeline I'll be testing is called “PL_Stage_Titles_With_Warning”. It's based on the PL_Stage_Titles_With_Stub pipeline I introduced in part 3, which supports injection of a stub source table. In the new pipeline, the original “Log pipeline end” Stored procedure activity is replaced by an If Condition of the same name:

This is the If Condition expression (visible in the screenshot above; reproduced here for clarity):

@less(
  activity('Copy src_Titles to stg_Titles').output.rowsCopied
, pipeline().parameters._RowCountWarningThreshold
)

The expression evaluates to true if the number of rows copied by the “Copy src_Titles to stg_Titles” activity is less than the value of pipeline parameter “_RowCountWarningThreshold”. The purpose of the condition is to allow a warning message to be added to the logged pipeline outcome, if the number of rows copied falls below the specified threshold:

  • if the expression evaluates to false stored procedure [dbo].[LogPipelineEnd] is called, in the same way as it was previously
  • if the expression evaluates to true the same SP is called, but with an extra warning message parameter.

This is the Stored procedure activity executed when the expression evaluates to true:

My pipeline test is structured in the same way as previous tests. My test fixture:

  • specifies the scenario and runs the pipeline in [OneTimeSetUp]
  • runs [Test]s using helper class functionality.

To start with a simple test, I want to verify that five activities are executed when the pipeline runs – these are the four activities visible in the pipeline screenshot, plus whichever one of the two logging activities is called (depending on whether the If Condition evaluates true or false).

You may prefer to follow this section in the complete VS solution. All the code in this article is available on GitHub – there's a link at the end.

Here's the start of my test fixture, including the new test:

namespace PL_Stage_Titles_With_Warning.UnitTests
{
    public class Given10Rows
    {
        private PLStageTitlesWithWarningHelper _helper;
 
        [OneTimeSetUp]
        public async Task WhenPipelineIsRun()
        {
            _helper = new PLStageTitlesWithWarningHelper()
                .WithSourceTable("test.TenTitleStub")
                .WithRowCountWarningThreshold(50);
            await _helper.RunPipeline();
        }
 
        [Test]
        public async Task Then5ActivitiesAreRun()
        {
            var count = await _helper.GetActivityRunCount();
            count.Should().Be(5);
        }
  • The [OneTimeSetup] method chains a call to a new helper method, WithRowCountWarningThreshold(), injecting the warning threshold to be used for the test. The helper method, defined in PLStageTitlesWithWarningHelper, populates an ADF pipeline parameter of name _RowCountWarningThreshold with the injected value.
  • The new test method Then5ActivitiesAreRun() uses another new helper method: GetActivityRunCount(). I want to reuse this method for other pipelines, so I implement it in a new “pipeline run helper” class (details below). Because the new helper method is awaitable, I store its result in a count variable and assert against that.

The .NET machinery for interacting with Azure Data Factory (in the data factory helper) doesn't make for very readable code, particularly now that I'm extending ADF interaction to include pipeline activities. To improve on that, I separate the logical view of a pipeline run from the ADF machinery by introducing a new helper class. The new class fits into the hierarchy like this:

The pipeline run helper looks like a less-cluttered version of the original data factory helper. Here's an extract:

public T WithParameter(string name, object value)
{
    _parameters[name] = value;
    return (T)this;
}
 
public async Task RunPipeline(string pipelineName)
{
    if (_hasRun)
        throw new Exception("RunPipeline() can only be called once per instance lifetime");
    _hasRun = true;
 
    RunId = await TriggerPipeline(pipelineName, _parameters);
    while (await IsInProgress(RunId))
        Thread.Sleep(2000);
    RunOutcome = await GetRunStatus(RunId);
}
 
public async Task<int> GetActivityRunCount(string pattern = ".*")
{
    await InitialiseActivityRuns();
    Regex rgx = new Regex(pattern);
    return _activityRuns.Where(ar => rgx.IsMatch(ar.ActivityName)).Count();
}
 
private async Task InitialiseActivityRuns()
{
    if (_activityRuns == null)
        _activityRuns = await GetActivityRuns(RunId);
}

Here you can see that:

  • It's the new helper which is now responsible for pipeline run parameters (using WithParameter()), because pipeline parameter values are specific to each pipeline run.
  • The RunPipeline() method starts the pipeline, then waits for it to finish, but the detail of triggering and monitoring it remains in the data factory helper. This makes the new helper more readable.
  • GetActivityRunCount() (the method called in the test fixture) returns the number of runs of activities whose names match a given pattern. By default, all activity runs performed during the pipeline run are counted.

GetActivityRunCount() uses a private method to initialise a local collection of activity runs (_activityRuns). This avoids having to make multiple calls to ADF – as soon as any activity information is required for a pipeline run, all activity run data is retrieved and cached locally using GetActivityRuns(), a new method in the data factory helper.

The changes I've made mean that the data factory helper now needs to use an ADF client connection in several different places, so it uses an instance variable to reference the client. The variable is initialised using InitialiseClient() – client authentication is awaitable, so initialisation can't be done in the constructor. Each public method begins with a call to InitialiseClient() to ensure that the client is ready to do whatever work is required.

This extract shows the initialisation method (line 3), the TriggerPipeline() method used to start a pipeline run (and which now returns the ID of the created run; line 16) and the GetActivityRuns() method used to retrieve all activities executed during a pipeline run (line 23):

  1. private DataFactoryManagementClient _adfClient;
  2.  
  3. private async Task InitialiseClient()
  4. {
  5. if (_adfClient != null)
  6. return;
  7.  
  8. var context = new AuthenticationContext("https://login.windows.net/" + GetSetting("AZURE_TENANT_ID"));
  9. var cc = new ClientCredential(GetSetting("AZURE_CLIENT_ID"), GetSetting("AZURE_CLIENT_SECRET"));
  10. var authResult = await context.AcquireTokenAsync("https://management.azure.com/", cc);
  11.  
  12. var cred = new TokenCredentials(authResult.AccessToken);
  13. _adfClient = new DataFactoryManagementClient(cred) { SubscriptionId = GetSetting("AZURE_SUBSCRIPTION_ID") };
  14. }
  15.  
  16. public async Task<string> TriggerPipeline(string pipelineName, IDictionary<string, object> parameters)
  17. {
  18. await InitialiseClient();
  19. var response = await _adfClient.Pipelines.CreateRunWithHttpMessagesAsync(_rgName, _adfName, pipelineName, parameters: parameters);
  20. return response.Body.RunId;
  21. }
  22.  
  23. public async Task<List<ActivityRun>> GetActivityRuns(string pipelineRunId)
  24. {
  25. await InitialiseClient();
  26.  
  27. var filter = new RunFilterParameters(DateTime.MinValue, DateTime.UtcNow);
  28. var arqr = await _adfClient.ActivityRuns.QueryByPipelineRunAsync(_rgName, _adfName, pipelineRunId, filter);
  29. var activityRuns = arqr.Value.ToList();
  30.  
  31. while (!string.IsNullOrWhiteSpace(arqr.ContinuationToken))
  32. {
  33. filter.ContinuationToken = arqr.ContinuationToken;
  34. arqr = await _adfClient.ActivityRuns.QueryByPipelineRunAsync(_rgName, _adfName, pipelineRunId, filter);
  35. activityRuns.AddRange(arqr.Value);
  36. }
  37.  
  38. return activityRuns;
  39. }
  40.  
  41. public virtual void TearDown()
  42. {
  43. _adfClient?.Dispose();
  44. }

For large pipelines, activity run data is returned in more than one “page” – the purpose of the while loop in GetActivityRuns() is to iterate over all pages.

My original data factory helper auto-disposed the client with a using statement, but I can't do that if I want to be able to use it more than once. Instead, I've added a TearDown() method to dispose the client explicitly when I've run all the tests in a fixture – I'll talk about how that gets called later.

The GetActivityRunCount() method I introduced above can optionally take an activity name pattern parameter. I can use this to verify the number of times a specific activity was executed.

The pipeline under test, “PL_Stage_Titles_With_Warning”, uses an If Condition to execute one of two Stored procedure activities depending on whether the copied row count falls below a certain threshold. For given input tables it should be possible to assert which of the two activities is called.

In the case of the Given10Rows text fixture above, I expect the “Log pipeline end with warning” activity to be called once, and “Log pipeline end without warning” not to be called at all. I assert this as follows:

[Test]
public async Task ThenLogWithWarningRunOnce()
{
  var count = await _helper.GetActivityRunCount("Log pipeline end with warning");
  count.Should().Be(1);
}
 
[Test]
public async Task ThenLogWithoutWarningNotRun()
{
  var count = await _helper.GetActivityRunCount("Log pipeline end without warning");
  count.Should().Be(0);
}

In the Given100Rows case (using stub table [test].[HundredTitleStub]) I can make the opposite pair of assertions: that activity “Log pipeline end without warning” is run once and that “Log pipeline end with warning” is not run.

I can also make assertions about activities' outputs. The output of the “Copy src_Titles to stg_Titles” Copy Data activity is a JSON object that starts like this:

{
    "dataRead": 120,
    "dataWritten": 160,
    "sourcePeakConnections": 1,
    "sinkPeakConnections": 2,
    "rowsRead": 10,
    "rowsCopied": 10,
    "copyDuration": 12,
    "throughput": 0.01,
...

By inspecting an activity's output JSON I can assert against its properties – for example, in Given10Rows I can verify that 10 rows were copied by inspecting the rowsCopied property. I do this with two new methods:

  • Then10RowsAreCopied() is the new test in the Given10Rows test fixture:

    [Test]
    public async Task Then10RowsAreCopied()
    {
        var rowsCopied = await _helper.GetActivityOutput("Copy src_Titles to stg_Titles", "$.rowsCopied");
        int.Parse(rowsCopied).Should().Be(10);
    }
  • GetActivityOutput() is defined in the new pipeline run helper class. For a given activity name and JSON property path, it returns the associated property value:

    public async Task<string> GetActivityOutput(string activityName, string propertyPath = "$")
    {
        await InitialiseActivityRuns();
        string output = _activityRuns.Where(ar => ar.ActivityName == activityName).FirstOrDefault().Output.ToString();
        var obj = JObject.Parse(output);
        return obj.SelectToken(propertyPath).ToString();
    }

I can verify other property values (or the entire JSON output object) as required.

In the same way that I use [OneTimeSetUp] to run code in a test fixture before any tests are executed, I can specify a [OneTimeTearDown] method for NUnit to dispose resources or undo changes after all a fixture's tests have run. This is how I arrange for the data factory helper's TearDown() method to be called.

Every test fixture now includes:

  • a [OneTimeSetUp] method to set up the test scenario and run the ADF pipeline
  • one or more [Test]s
  • a [OneTimeTearDown] method which calls the helper's TearDown() method.

This is the [OneTimeTearDown] for my Given10Rows test fixture:

[OneTimeTearDown]
public void TearDown()
{
    _helper?.TearDown();
}

[OneTimeTearDown] also gives me the opportunity to improve connection handling in the database helper, by initialising a connection for the lifetime of a fixture and disposing it at teardown. My database helper now looks like this:

private SqlConnection _conn;
 
public DatabaseHelper()
{
    _conn = new SqlConnection(GetSetting("AdfTestingDbConnectionString"));
    _conn.Open();
}
 
public int RowCount(string tableName)
{
    using (var cmd = new SqlCommand($"SELECT COUNT(*) FROM {tableName}", _conn))
    using (var reader = cmd.ExecuteReader())
    {
        reader.Read();
        return reader.GetInt32(0);
    }
}
 
public override void TearDown()
{
    _conn?.Dispose();
    base.TearDown();
}

I haven't included the full class definition here (it's available in my GitHub repo) but all the important elements are present:

  • the _conn instance variable stores a reference to the database connection
  • the database connection is created and opened in the helper's constructor
  • it is used – but not disposed – in the RowCount() method
  • it is disposed in TearDown(). Notice that the method also calls the data factory helper's TearDown() method, so the test fixture needs only to call _helper.TearDown() to dispose helper features from both classes.

My Then10RowsAreCopied() unit test looks very similar to the Then10RowsAreStaged() functional test I built in part 3, but each test verifies something different:

  • Then10RowsAreCopied() asserts that the Copy Data activity copies 10 rows
  • Then10RowsAreStaged() asserts that target table [dbo].[Titles] contains 10 rows.

In this case I probably don't need both of these tests, because the pipeline is very simple. A different pipeline might benefit from having both – for example one where the Copy Data activity is followed by a stored procedure call to remove duplicates. Choosing which tests you need – and how many is enough – varies from pipeline to pipeline, and is where testing starts to become more art than science.

In a general-purpose programming language, unit tests should be quick and easy to run from an IDE (in my case Visual Studio) as a developer writes code. This enables fast, frequent feedback which improves code quality. In ADF, the speed of feedback for automated testing is limited, because pipelines must first be published and run in a data factory instance.

In the code for this article, I've separated unit and functional tests for “PL_Stage_Titles_With_Warning” into separate test fixtures. I've done this for clarity, but it has the unfortunate consequence that each test scenario has to be set up and run twice – once for each type of test. This adds redundancy, time and cost to my test suite.

The need to publish ADF pipelines before you can run tests blurs some of the traditional boundary between unit and functional tests. I prefer to write all functional and unit tests for a given ADF pipeline scenario in the same test fixture, because it makes my test suite faster and cheaper to run.

In this post I introduced unit testing for ADF pipelines by inspecting pipeline activity runs and outcomes. This is characteristic of a unit test in that it verifies actions taken by the pipeline (if the pipeline is “doing the right things”) rather than their effects (if the pipeline is “doing things right”, a functional test).

The tests described here differ from “traditional” unit tests, in that they require the pipeline to be published and executed before tests can be run. Because of this, it is cheaper and faster to run unit and functional tests of the same scenario in a single test fixture.

  • Next up: In the next post I look at collecting activity run information across all test fixtures, in order to calculate test coverage.

  • Code: The code for the series is available on Github. The Visual Studio solution specific to this article is in the adf-testing-series/vs/05-UnitTesting folder. It contains three projects: the NUnit project AdfTests along with database projects for the [ExternalSystems] and [AdfTesting] databases. Tables in the [ExternalSystems] database are based on Microsoft's Northwind sample database.

    The GitHub repo includes my “tests.runsettings” file, but its association with the VS solution is not persisted – this is a VS issue. Before running tests for the first time you will need to specify the solution's runsettings file.

  • Share: If you found this article useful, please share it!

adfnovice, 2020/07/17 08:59
Hi Richard,

I was searching for unit testing in ADF for a long time, glad I found this great series of blogs. I have a process related question: though I can appreciate the benefits of unit testing pipelines but is it really worth he effort spent? A typical adf developer tests his or her pipeline doing debug run. Unit testing is just a rhetorical process in my opinion. Also functional testing of pipelines, if its based on row count.. it cannot be rowcount always to validate a working of mapping data flow. Data gets grouped, consolidated, filtered etc. In those cases, I am not sure how much of this functional testing would help. I am just asking this question out of curiosity. Would you pls consider responding. thanks.
Richard, 2020/07/23 22:14
Testing during development is important but not enough -- you need to know that your pipeline **keeps** working! https://richardswinbank.net/adf/why_automate_adf_pipeline_testing. What I've presented here isn't intended to meet every test requirement -- if you need to verify something other than row count, you need to write a test to do that :)
P U C B Y