Isolated functional tests for Azure Data Factory

This is the third article in my series about automated testing for Azure Data Factory (ADF) pipelines. If this is the first one you've seen, you may prefer to start at the beginning.

In the previous post in this series I used the NUnit testing framework to develop an easy-to-write, simple test for an ADF pipeline. So far, every test I've written runs the pipeline as-is, without separating it from its external dependencies. An integration test like this verifies that communication between the pipeline and its wider environment is properly configured, but can't reliably test whether a pipeline is implemented correctly.

In this article, I'll look at isolating a pipeline from its external dependencies in order to test it independently and with a range of testing scenarios. I'll be trying to establish that a pipeline is “doing things right” in isolation – this is a functional test.

Dependency injection is a technique used in software engineering, in which an object's dependencies are not specified internally, but are instead supplied (“injected”) from elsewhere. This is a form of inversion of control. There are good design reasons for doing this, but it also permits different dependencies to be injected at different times – during testing, for example.

You may already be using dependency injection in ADF, even if you don't call it that. You probably don't define a new ADF dataset for every table in a database – instead you create dataset parameters for table schema and name, and supply runtime values from each pipeline using the dataset. This is dependency injection!

I'm going to use dependency injection to substitute external dependencies of an ADF pipeline with others at test time. Critically, the substituted dependencies will be things that I control, allowing me to construct a broad, reliable set of test scenarios.

Remember the row count test I introduced in the last post? It enabled me to verify that the number of rows staged by an ADF pipeline run was as expected, but was fragile because I had no control of the external source system. I'm going to implement an ADF pipeline in a way that lets me choose a different source table for testing.

A substituted dependency injected for testing purposes is called a test double. A substitute source table which I control is the simplest form of test double and is called a stub. I create a new table in the [AdfTesting] database, which is the database I control. I call the table [test].[TenTitleStub] – as the name suggests, it's a stub that contains ten rows of data.

I need to make some changes in my data factory to support dependency injection:

  • to pipelines (to allow a test to inject the stub source table and database connection)
  • in the source dataset (to allow the pipeline to inject the stub source table and database connection)
  • in the linked service (to allow the dataset to inject the database connection).

Linked service

I define linked service “LS_ASQL_Parameterised” with a parameter named ConnectionStringSecretName, and I use that parameter to specify the linked service connection string secret dynamically:

This is another good reason to store connection strings in an Azure Key Vault ;-).

Dataset

I define dataset “DS_ASQL_Parameterised” with three parameters:

  • ConnectionStringSecretName
  • TableSchema
  • TableName

The dataset uses the “LS_ASQL_Parameterised” linked service, passing in the value of its ConnectionStringSecretName parameter. The table schema and name for the dataset are supplied by the TableSchema and TableName parameters:

ADF pipeline

I create pipeline “PL_Stage_Titles_With_Stub”. This is a clone of the “PL_Stage_Titles” pipeline I introduced in the last post, with a few changes.

The new pipeline has three parameters, _SrcConnectionSecretName, _SrcTableSchema and _SrcTableName. I start each parameter name with an underscore (_) to differentiate them from any other parameters – their sole purpose is to support dependency injection for testing.

Notice that the parameters' default values identify the real source table: [dbo].[Titles], in the [ExternalSystem] database – if I provide no parameter values at runtime, I get the real, design-time implementation.

The Copy data activity passes the incoming parameter values to the source dataset:

Now I need to update my testing code to allow me to inject the stub source table. I could just write this into the pipeline helper class, but I'm going to go one better: I'm going to allow the stub to be specified as part of the test setup. This will let me test different scenarios with different stubs.

You may prefer to follow this section in the complete VS solution. All the code in this article is available on GitHub – there's a link at the end.

Test fixture

The structure of my test fixture should look familiar by now: it's called Given10Rows and contains two tests:

  • ThenPipelineOutcomeIsSucceeded() checks that the pipeline ran successfully
  • Then10RowsAreStaged() verifies that the pipeline run staged ten rows.

I've made one change in my [OneTimeSetUp] (line 11) – I've chained a method call onto the helper constructor call: WithSourceTable("test.TenTitleStub"). This is where I specifiy the stub to be injected.

I'm still using a ThenPipelineOutcomeIsSucceeded() test here because I still want to verify that the pipeline ran successfully. What makes this a functional test is its isolation from the external source system in [OneTimeSetUp].

  1. namespace PL_Stage_Titles_With_Stub.FunctionalTests
  2. {
  3. public class Given10Rows
  4. {
  5. private PLStageTitlesWithStubHelper _helper;
  6.  
  7. [OneTimeSetUp]
  8. public async Task WhenPipelineIsRun()
  9. {
  10. _helper = new PLStageTitlesWithStubHelper()
  11. .WithSourceTable("test.TenTitleStub");
  12. await _helper.RunPipeline();
  13. }
  14.  
  15. [Test]
  16. public void ThenPipelineOutcomeIsSucceeded()
  17. {
  18. _helper.PipelineOutcome.Should().Be("Succeeded");
  19. }
  20.  
  21. [Test]
  22. public void Then10RowsAreStaged()
  23. {
  24. _helper.StagedRowCount.Should().Be(10);
  25. }
  26. }
  27. }

Why do I specify the source table here (and not in the helper class, for example)? One reason is that I want to be able to use the helper class for multiple test scenarios, using multiple different stubs. Another is that my choice of stub is a key part of this test scenario – so I want it to be clear in the test definition.

The practice of constructing objects using chained .With…() method calls is an implementation of the Fluent Builder design pattern for C#. I'm using a variation on this with the snappy title Fluent Builder Interface With Recursive Generics – I'll explain why later.

Pipeline helper

I'm going to define the WithSourceTable() method in the new pipeline's helper class:

namespace PL_Stage_Titles_With_Stub
{
    public class PLStageTitlesWithStubHelper : DatabaseHelper<PLStageTitlesWithStubHelper>
    {
        public async Task RunPipeline()
        {
            await RunPipeline("PL_Stage_Titles_With_Stub");
        }
 
        public PLStageTitlesWithStubHelper WithSourceTable(string tableName)
        {
            return this
                .WithParameter("_SrcConnectionSecretName", "AdfTestingDbConnectionString")
                .WithParameter("_SrcTableSchema", tableName.Split('.')[0])
                .WithParameter("_SrcTableName", tableName.Split('.')[1]);
        }
 
        public int StagedRowCount
        {
            get
            {
                return RowCount("stg.Titles");
            }
        }
    }
}

The method chains three WithParameter() calls together, passing the source connection secret name, table schema and table name. WithParameter() is defined in the data factory helper class, which probably isn't a surprise: the three calls set the ADF pipeline parameter values to inject the stub source table into the pipeline run.

Is the pipeline helper the right place for WithSourceTable(), or does the method belong more naturally in DatabaseHelper? I don't think it belongs in the database helper because not all databases act as data sources.

You'll notice that PLStageTitlesWithStubHelper now subclasses a generic DatabaseHelper, which here takes the subclass itself as type argument. This is the “recursive generics” bit of the Fluent Builder Interface… pattern.

Data factory helper

The data factory helper needs to be updated to pass the new parameters into the ADF pipeline. Microsoft's API for running ADF pipelines accepts parameters as a Dictionary<string, object> collection, so I've added a _parameters instance variable of this type on line 3. The _parameters collection is:

  • created on line 46
  • added to by calls to WithParameter() (line 6)
  • passed to ADF when the pipeline run is started (line 30).

Here's the revised class:

  1. public class DataFactoryHelper<T> : SettingsHelper<T> where T : DataFactoryHelper<T>
  2. {
  3. private Dictionary<string, object> _parameters;
  4.  
  5. public T WithParameter(string name, object value)
  6. {
  7. _parameters[name] = value;
  8. return (T)this;
  9. }
  10.  
  11. public string PipelineOutcome { get; private set; }
  12.  
  13. public async Task RunPipeline(string pipelineName)
  14. {
  15. PipelineOutcome = "Unknown";
  16.  
  17. // authenticate against Azure
  18. var context = new AuthenticationContext("https://login.windows.net/" + GetSetting("AZURE_TENANT_ID"));
  19. var cc = new ClientCredential(GetSetting("AZURE_CLIENT_ID"), GetSetting("AZURE_CLIENT_SECRET"));
  20. var authResult = await context.AcquireTokenAsync("https://management.azure.com/", cc);
  21.  
  22. // prepare ADF client
  23. var cred = new TokenCredentials(authResult.AccessToken);
  24. using (var adfClient = new DataFactoryManagementClient(cred) { SubscriptionId = GetSetting("AZURE_SUBSCRIPTION_ID") })
  25. {
  26. var adfName = GetSetting("DataFactoryName");
  27. var rgName = GetSetting("DataFactoryResourceGroup");
  28.  
  29. // run pipeline
  30. var response = await adfClient.Pipelines.CreateRunWithHttpMessagesAsync(rgName, adfName, pipelineName, parameters: _parameters);
  31. string runId = response.Body.RunId;
  32.  
  33. // wait for pipeline to finish
  34. var run = await adfClient.PipelineRuns.GetAsync(rgName, adfName, runId);
  35. while (run.Status == "Queued" || run.Status == "InProgress" || run.Status == "Canceling")
  36. {
  37. Thread.Sleep(2000);
  38. run = await adfClient.PipelineRuns.GetAsync(rgName, adfName, runId);
  39. }
  40. PipelineOutcome = run.Status;
  41. }
  42. }
  43.  
  44. public DataFactoryHelper()
  45. {
  46. _parameters = new Dictionary<string, object>();
  47. PipelineOutcome = "Unknown";
  48. }
  49. }

Now that I can pass a stub table to the pipeline, I can create other stubs to test different scenarios.

I have another table in the [AdfTesting] database called [test].[HundredTitleStub] – it's a stub containing 100 rows of data. I can run the same set of tests for the hundred-row scenario using a very similar test fixture:

namespace PL_Stage_Titles_With_Stub.FunctionalTests
{
    public class Given100Rows
    {
        private PLStageTitlesWithStubHelper _helper;
 
        [OneTimeSetUp]
        public async Task WhenPipelineIsRun()
        {
            _helper = new PLStageTitlesWithStubHelper()
                .WithSourceTable("test.HundredTitleStub");
            await _helper.RunPipeline();
        }
 
        [Test]
        public void ThenPipelineOutcomeIsSucceeded()
        {
            _helper.PipelineOutcome.Should().Be("Succeeded");
        }
 
        [Test]
        public void Then100RowsAreStaged()
        {
            _helper.StagedRowCount.Should().Be(100);
        }
    }
}

I run the tests in the usual way. Looks good!

You can see the different pipeline runs (and the different injected stubs) using the ADF UI's Monitor tab. “PL_Stage_Titles_With_Stub” ran three times: once using its integration test setup, once using [test].[TenTitleStub] and once using [test].[HundredTitleStub].

Just in case you aren't feeling it, this is really exciting! :-D

  • These are the results of running the same pipeline three times, with three different sets of input data.
  • I can change the input data for a scenario simply by specifying a different source table to the pipeline helper, using WithSourceTable() in [OneTimeSetup].
  • I have absolute control over input data, so I can build new inputs to test as many scenarios as I can come up with.

This is suddenly much more powerful.

Now I'm going to write a test for another feature. Remember that “PL_Stage_Titles” logs the pipeline run start and end?

  • “Log pipeline start” calls database stored procedure [dbo].[LogPipelineStart] to create a log record and return its unique integer ID
  • “Log pipeline end” calls SP [dbo].[LogPipelineEnd], using the returned run ID to update the log record with the staged row count.

The two stored procedures look like this:

CREATE PROCEDURE [dbo].[LogPipelineStart] (
  @pipelineName NVARCHAR(255)
)
AS
 
INSERT INTO dbo.PipelineRun (
  PipelineName
) VALUES (
  @pipelineName
);
 
SELECT SCOPE_IDENTITY() AS RunId;
CREATE PROCEDURE [dbo].[LogPipelineEnd] (
  @runId INT
, @rowsCopied INT
)
AS
 
UPDATE pr
SET RowsCopied = @rowsCopied
  , RunEnd = GETUTCDATE()
FROM dbo.PipelineRun pr
WHERE pr.RunId = @runId;

The success of the “Log pipeline end” activity relies on the value returned by [dbo].[LogPipelineStart] being correctly copied into the pipeline's “RunId” variable and then passed back into [dbo].[LogPipelineEnd] by the “Log pipeline end” activity. A test to make sure that this happens sounds like a good idea.

One way to test this might be to run the pipeline, then to try to work out if a new record was created in [dbo].[PipelineRun] and subsequently updated with a row count. This would be poorly-isolated:

  • In a shared testing environment, I can't be sure that the latest [dbo].[PipelineRun] row was created by my test run (and not by another developer)
  • If either of the stored procedures contains an error, or if the IDENTITY property is missing from [dbo].[RunPipeline], the test might fail even if the pipeline is working properly.

I still want to find out if these database objects contain errors, just not here – database objects need to be supported by their own test suites. If you're not already doing this, I highly recommend tSQLt for automated database testing.

I need to isolate the pipeline from these effects to get a reliable functional test. I'm going to do that using two new test doubles:

  • a stub stored procedure that returns a specific run ID value
  • a stored procedure that accepts the incoming @runId and @rowsCopied and records them in a special log table. This is a more sophisticated form of test double called a spy. (If you're already using tSQLt this idea will be familiar).

Here are their definitions:

CREATE PROC [test].[LogPipelineStartStub] (
  @pipelineName NVARCHAR(255)
)
AS
 
SELECT -17483 AS RunId;
CREATE PROCEDURE [test].[LogPipelineEndSpy] (
  @runId INT
, @rowsCopied INT
)
AS
 
INSERT INTO [test].[LogPipelineEndSpyLog] (
  RunId
) VALUES (
  @runId
);

ADF pipeline

I clone “PL_Stage_Titles_With_Stub” and name the new pipeline “PL_Stage_Titles_With_Spy”. The new stub and spy are injected into the pipeline in the same way that I injected the source table stub – as pipeline parameters which are then used in the “Log pipeline start” and “Log pipeline end” activities:

Test fixture

I create a test fixture called GivenRunId with a [OneTimeSetUp] specifiying that I want to use a stubbed run ID. I add test ThenRunIdIsPassedCorrectly() to check the value logged by the spy procedure:

[OneTimeSetUp]
public async Task WhenPipelineIsRun()
{
    _helper = new PLStageTitlesWithSpyHelper()
        .WithSourceTable("test.TenTitleStub")
        .WithRunIdStub();
    await _helper.RunPipeline();
}
 
[Test]
public void ThenRunIdIsPassedCorrectly()
{
    _helper.RunId.Should().Be(-17483);
}

Pipeline helper

The pipeline helper PLStageTitlesWithSpyHelper is based on PLStageTitlesWithStubHelper and has two additional members:

  • WithRunIdStub() uses data factory helper's WithParameter() method I used earlier, but also calls method WithEmptyTable() in the database helper, to empty the log table before the test is run
  • the RunId property calls the database helper's ColumnData() method.
public PLStageTitlesWithSpyHelper WithRunIdStub()
{
    return this
        .WithParameter("_LogStartSpName", "test.LogPipelineStartStub")
        .WithParameter("_LogEndSpName", "test.LogPipelineEndSpy")
        .WithEmptyTable("test.LogPipelineEndSpyLog");
}
 
public int RunId
{
    get
    {
        return int.Parse(ColumnData("test.LogPipelineEndSpyLog", "RunId"));
    }
}

Database helper

The DatabaseHelper class has two new methods:

  • WithEmptyTable() takes a table name parameter and truncates the specified database table
  • ColumnData() is a convenient way for me to retrieve the logged run ID value.
public T WithEmptyTable(string tableName)
{
    using (var _conn = new SqlConnection(GetSetting("AdfTestingDbConnectionString")))
    {
        _conn.Open();
        using (var cmd = new SqlCommand($"TRUNCATE TABLE {tableName}", _conn))
            cmd.ExecuteNonQuery();
    }
    return (T)this;
}
 
public string ColumnData(string tableName, string columnName, char separator = ',')
{
    using (var _conn = new SqlConnection(GetSetting("AdfTestingDbConnectionString")))
    {
        _conn.Open();
        using (var cmd = new SqlCommand($"SELECT STRING_AGG([{columnName}],'{separator}') FROM {tableName}", _conn))
        using (var reader = cmd.ExecuteReader())
        {
            reader.Read();
            return reader.GetString(0);
        }
    }           
}

You'll notice that the database helper now contains three methods, each of which opens and closes a connection to a database. This isn't very efficient, but I've already covered a lot of ground in this article – I fix it in a later post.

Initially I suggested inspecting [dbo].[PipelineRun] as a possible approach to this test. What I've done here is much better-isolated than that, but it's not perfect:

  • I used a stub run ID of -17483 because it's unlikely to arise naturally (and is different from the pipeline variable's default value) – but it's still possible that a bug in the pipeline could output exactly this value. It would be better to be able to inject different run IDs and demonstrate that input and output are consistent over multiple pipeline runs.
  • The test depends on my having exclusive access to the spy log table, which may not be guaranteed. I've written the database helper's ColumnData() method to try to force an error if that happens, but I could still be unlucky.

A better-isolated approach would be to generate the stub and spy automatically at test time – that way I could inject a selection of run IDs and use a randomly-generated spy log table name. I may come back to this towards the end of the series.

In this article I've been using ADF pipeline parameters to inject dependencies – you're probably already using them to supply other information to pipelines. The data factory helper's .WithParameter() method allows you to supply your own parameters as part of a test fixture's [OneTimeSetUp]; e.g.:

[OneTimeSetUp]
public async Task WhenPipelineIsRun()
{
    _helper = new PLStageTitlesWithSpyHelper()
        .WithParameter("myStringParam", "hello")
        .WithParameter("myIntParam", 123)
        .WithSourceTable("test.TenTitleStub")
        .WithRunIdStub();
    await _helper.RunPipeline();
}

This is what the Fluent Builder Interface With Recursive Generics pattern is for. I can only chain method calls together for fluent building if every With…() method returns an object of the same type. This wouldn't be true in a traditional inheritance hierarchy, but recursive generics make it possible.

In this post I built ADF pipelines using pipeline parameter defaults to specify external dependencies. This enabled me to override those dependencies at test time, injecting my own selection of test doubles to ensure functional test isolation. Writing pipelines in this style opens up a world of flexible testing opportunities.

In the first post of this series I described functional testing as verifying that a pipeline is “doing things right”. In a later post I'll look at how to demonstrate that a pipeline is “doing the right things” – something closer to a pipeline unit test. In the next post I'm going to take a break from writing tests and come back to automation – specifically, how to run ADF tests automatically in a CI/CD pipeline.

  • Next up: In the next post in the series ​I author ​an Azure DevOps pipeline to run automated tests in response to developer changes.

  • Code: The code for the series is available on Github. The Visual Studio solution specific to this article is in the adf-testing-series/vs/03-FunctionalTesting folder. It contains three projects: the NUnit project AdfTests along with database projects for the [ExternalSystems] and [AdfTesting] databases. Tables in the [ExternalSystems] database are based on Microsoft's Northwind sample database.

    The GitHub repo includes my “tests.runsettings” file, but its association with the VS solution is not persisted – this is a VS issue. Before running tests for the first time you will need to specify the solution's runsettings file.

  • Share: If you found this article useful, please share it!

KayBee, 2021/03/04 14:11
Love these posts! Thanks for them.
Shana, 2022/10/27 03:32
Love these posts! Thanks for them.