Skip to content

Commit 10ab4c6

Browse files
authored
feat: add synapse support (#128)
* feat: add synapse support * chore: add examples tests for synapse * docs: add docs for synapse support
1 parent 9c3b637 commit 10ab4c6

File tree

29 files changed

+581
-288
lines changed

29 files changed

+581
-288
lines changed

.pre-commit-config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ repos:
2222
exclude: |
2323
(?x)^(
2424
tests/functional/utf_16_encoding/pipeline-content.json|
25-
examples/fabric/simple_web_hook/fabric/ExamplePipeline.DataPipeline/pipeline-content.json
25+
examples/fabric/simple_web_hook/fabric/ExamplePipeline.DataPipeline/pipeline-content.json|
26+
tests/functional/test_framework/data/fabric/pipeline-content.json
2627
)$
2728
- id: trailing-whitespace
2829
- id: mixed-line-ending

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Data Factory - Testing Framework :hammer_and_wrench:
22

3-
A stand-alone test framework that allows to write unit tests for Data Factory pipelines on [Microsoft Fabric](https://learn.microsoft.com/en-us/fabric/data-factory/) and [Azure Data Factory](https://learn.microsoft.com/en-us/azure/data-factory/concepts-pipelines-activities?tabs=data-factory).
3+
A stand-alone test framework that allows to write unit tests for Data Factory pipelines on [Microsoft Fabric](https://learn.microsoft.com/en-us/fabric/data-factory/), [Azure Data Factory](https://learn.microsoft.com/en-us/azure/data-factory/concepts-pipelines-activities?tabs=data-factory) and [Azure Synapse Analytics](https://learn.microsoft.com/en-us/azure/data-factory/concepts-pipelines-activities?tabs=data-factory).
44

55
> The framework is currently in _Public Preview_ and is not officially supported by Microsoft.
66
@@ -84,6 +84,10 @@ Azure Data Factory:
8484
1. [Copy blobs example](examples/data_factory/copy_blobs/README.md)
8585
2. [Batch job example](examples/data_factory/batch_job/README.md)
8686

87+
Azure Synapse Analytics:
88+
89+
1. [Copy blobs example](examples/synapse/copy_blobs/README.md)
90+
8791
## Contributing :handshake:
8892

8993
This project welcomes contributions and suggestions. Most contributions require you to agree to a

docs/advanced/development_workflow.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
# Recommended development workflow for Azure Data Factory v2
1+
# Recommended development workflow for Azure Data Factory (ADF) v2 and Azure Synapse Analytics
22

3-
* Use ADF Git integration
3+
* Use ADF / Azure Synapse Analytics Git integration
44
* Use UI to create a feature branch, build the initial pipeline, and save it to the feature branch
55
* Pull feature branch locally
66
* Start writing unit and functional tests, run them locally for immediate feedback, and fix bugs

docs/basic/repository_setup.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@ To be able to write tests for data factory, the pipeline and activity definition
66

77
1. [Fabric - Git integration process](https://learn.microsoft.com/fabric/cicd/git-integration/git-integration-process)
88
2. [Azure Data Factory - Git integration process](https://learn.microsoft.com/azure/data-factory/source-control)
9+
3. [Azure Synapse Analytics - Git integration process](https://learn.microsoft.com/en-us/azure/synapse-analytics/cicd/source-control)
910

10-
### Alternative for Azure Data Factory
11+
### Alternative for Azure Data Factory and Azure Synapse Analytics
1112

1213
To download a single JSON file for testing purposes, follow these steps:
1314

14-
1. Open the Data Factory instance, and open the pipeline to be tested.
15+
1. Open the Data Factory or Synapse Analytics instance, and open the pipeline to be tested.
1516
2. Click on the action ellipses
1617
3. Click "Download support files"
1718
4. Extract the zip file containing the pipeline definition in a folder of choice.

examples/data_factory/copy_blobs/test_data_factory_copy_blobs_unit.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from data_factory_testing_framework.models.activities import Activity, ForEachActivity
55
from data_factory_testing_framework.state import (
66
PipelineRunState,
7-
PipelineRunVariable,
87
RunParameter,
98
RunParameterType,
109
)
@@ -27,9 +26,6 @@ def test_list_blobs(pipeline: Pipeline) -> None:
2726
# Arrange
2827
activity = pipeline.get_activity_by_name("List Folders")
2928
state = PipelineRunState(
30-
variables=[
31-
PipelineRunVariable(name="SourceContainerName", default_value="source"),
32-
],
3329
parameters=[
3430
RunParameter(RunParameterType.Global, "SourceStorageAccountName", "sourcestorage"),
3531
RunParameter(
@@ -55,9 +51,6 @@ def test_for_each(pipeline: Pipeline) -> None:
5551
# Arrange
5652
activity = pipeline.get_activity_by_name("For Each SourceFolder")
5753
state = PipelineRunState(
58-
variables=[
59-
PipelineRunVariable(name="SourceContainerName", default_value="source"),
60-
],
6154
parameters=[
6255
RunParameter(RunParameterType.Global, "SourceStorageAccountName", "sourcestorage"),
6356
RunParameter(

examples/synapse/copy_blobs/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Copy Blobs
2+
3+
This is an example pipeline which intends to list all the blobs in a given container and copies these blobs to another container
4+
5+
![image](copy_blobs.png)
6+
7+
The pipeline has two activities:
8+
9+
1. **List folders**: Web activity to list all blobs in a container that has a given prefix
10+
2. **For each activity**: Iterates over each item in the list returned above and executes the sub-activity on each item.
11+
12+
2.1. **Copy files to destination**: Copy activity which copies the blobs to a given destination.
22.6 KB
Loading
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
{
2+
"name": "copy_blobs",
3+
"properties": {
4+
"activities": [
5+
{
6+
"name": "List Folders",
7+
"type": "WebActivity",
8+
"dependsOn": [],
9+
"policy": {
10+
"timeout": "0.12:00:00",
11+
"retry": 0,
12+
"retryIntervalInSeconds": 30,
13+
"secureOutput": false,
14+
"secureInput": false
15+
},
16+
"userProperties": [],
17+
"typeProperties": {
18+
"method": "GET",
19+
"headers": {
20+
"x-ms-version": "2023-01-03"
21+
},
22+
"url": {
23+
"value": "@concat('https://',pipeline().parameters.SourceStorageAccountName,'.blob.core.windows.net/',pipeline().parameters.SourceContainerName,'?restype=container&comp=list&prefix=',pipeline().parameters.SourceFolderPrefix,'&delimiter=$SourceBlobDelimiter')",
24+
"type": "Expression"
25+
},
26+
"connectVia": {
27+
"referenceName": "AutoResolveIntegrationRuntime",
28+
"type": "IntegrationRuntimeReference"
29+
},
30+
"authentication": {
31+
"type": "MSI",
32+
"resource": "https://storage.azure.com"
33+
}
34+
}
35+
},
36+
{
37+
"name": "For Each SourceFolder",
38+
"type": "ForEach",
39+
"dependsOn": [
40+
{
41+
"activity": "List Folders",
42+
"dependencyConditions": [
43+
"Succeeded"
44+
]
45+
}
46+
],
47+
"userProperties": [],
48+
"typeProperties": {
49+
"items": {
50+
"value": "@xpath(xml(activity('List Folders').output.Response),'/EnumerationResults/Blobs/BlobPrefix/Name/text()')",
51+
"type": "Expression"
52+
},
53+
"activities": [
54+
{
55+
"name": "Copy files to Destination",
56+
"type": "Copy",
57+
"dependsOn": [],
58+
"policy": {
59+
"timeout": "0.12:00:00",
60+
"retry": 0,
61+
"retryIntervalInSeconds": 30,
62+
"secureOutput": false,
63+
"secureInput": false
64+
},
65+
"userProperties": [],
66+
"typeProperties": {
67+
"source": {
68+
"type": "ParquetSource",
69+
"storeSettings": {
70+
"type": "AzureBlobStorageReadSettings",
71+
"recursive": true,
72+
"wildcardFolderPath": {
73+
"value": "@item()",
74+
"type": "Expression"
75+
},
76+
"wildcardFileName": "*.parquet"
77+
},
78+
"formatSettings": {
79+
"type": "ParquetReadSettings"
80+
}
81+
},
82+
"sink": {
83+
"type": "ParquetSink",
84+
"storeSettings": {
85+
"type": "AzureBlobStorageWriteSettings",
86+
"copyBehavior": "FlattenHierarchy"
87+
},
88+
"formatSettings": {
89+
"type": "ParquetWriteSettings"
90+
}
91+
},
92+
"enableStaging": false,
93+
"translator": {
94+
"type": "TabularTranslator",
95+
"typeConversion": true,
96+
"typeConversionSettings": {
97+
"allowDataTruncation": true,
98+
"treatBooleanAsNumber": false
99+
}
100+
}
101+
},
102+
"inputs": [
103+
{
104+
"referenceName": "Binary",
105+
"type": "DatasetReference",
106+
"parameters": {
107+
"ServiceURI": {
108+
"value": "@concat('https://',pipeline().parameters.SourceStorageAccountName,'.blob.core.windows.net')",
109+
"type": "Expression"
110+
},
111+
"ContainerName": {
112+
"value": "@pipeline().parameters.SourceContainerName",
113+
"type": "Expression"
114+
},
115+
"FolderName": {
116+
"value": "@pipeline().parameters.SourceFolderPrefix",
117+
"type": "Expression"
118+
}
119+
}
120+
}
121+
],
122+
"outputs": [
123+
{
124+
"referenceName": "Binary",
125+
"type": "DatasetReference",
126+
"parameters": {
127+
"ServiceURI": {
128+
"value": "@concat('https://',pipeline().parameters.SinkStorageAccountName,'.blob.core.windows.net')",
129+
"type": "Expression"
130+
},
131+
"ContainerName": {
132+
"value": "@pipeline().parameters.SinkContainerName",
133+
"type": "Expression"
134+
},
135+
"FolderName": {
136+
"value": "@pipeline().parameters.SinkFolderName",
137+
"type": "Expression"
138+
}
139+
}
140+
}
141+
]
142+
}
143+
]
144+
}
145+
}
146+
],
147+
"parameters": {
148+
"SourceContainerName": {
149+
"type": "string"
150+
},
151+
"SourceFolderPrefix": {
152+
"type": "string"
153+
},
154+
"SinkStorageAccountName": {
155+
"type": "string"
156+
},
157+
"SinkContainerName": {
158+
"type": "string"
159+
},
160+
"SinkFolderName": {
161+
"type": "string"
162+
},
163+
"SourceStorageAccountName": {
164+
"type": "string"
165+
}
166+
},
167+
"folder": {
168+
"name": "batch"
169+
},
170+
"annotations": []
171+
}
172+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import pytest
2+
from data_factory_testing_framework import TestFramework, TestFrameworkType
3+
from data_factory_testing_framework.state import (
4+
DependencyCondition,
5+
RunParameter,
6+
RunParameterType,
7+
)
8+
9+
10+
def test_copy_blobs_pipeline(request: pytest.FixtureRequest) -> None:
11+
# Arrange
12+
test_framework = TestFramework(
13+
framework_type=TestFrameworkType.DataFactory, root_folder_path=request.fspath.dirname
14+
)
15+
pipeline = test_framework.get_pipeline_by_name("copy_blobs")
16+
17+
# Act
18+
activities = test_framework.evaluate_pipeline(
19+
pipeline=pipeline,
20+
parameters=[
21+
RunParameter(RunParameterType.Pipeline, "SourceStorageAccountName", "sourcestorageaccount"),
22+
RunParameter(RunParameterType.Pipeline, "SourceContainerName", "sourcecontainer"),
23+
RunParameter(RunParameterType.Pipeline, "SourceFolderPrefix", "sourcefolder"),
24+
RunParameter(RunParameterType.Pipeline, "SinkStorageAccountName", "sinkstorageaccount"),
25+
RunParameter(RunParameterType.Pipeline, "SinkContainerName", "sinkcontainer"),
26+
RunParameter(RunParameterType.Pipeline, "SinkFolderName", "sinkfolder"),
27+
],
28+
)
29+
30+
# Assert
31+
list_folder_activity = next(activities)
32+
assert list_folder_activity.name == "List Folders"
33+
assert (
34+
list_folder_activity.type_properties["url"].result
35+
== "https://sourcestorageaccount.blob.core.windows.net/sourcecontainer?restype=container&comp=list&prefix=sourcefolder&delimiter=$SourceBlobDelimiter"
36+
)
37+
assert list_folder_activity.type_properties["method"] == "GET"
38+
list_folder_activity.set_result(
39+
result=DependencyCondition.SUCCEEDED,
40+
output={
41+
"Response": """
42+
<EnumerationResults ServiceEndpoint="http://myaccount.blob.core.windows.net/" ContainerName="mycontainer">
43+
<Prefix>testfolder</Prefix>
44+
<Delimiter>$SourceBlobDelimiter</Delimiter>
45+
<Blobs>
46+
<BlobPrefix>
47+
<Name>testfolder_1/$SourceBlobDelimiter</Name>
48+
</BlobPrefix>
49+
<BlobPrefix>
50+
<Name>testfolder_2/$SourceBlobDelimiter</Name>
51+
</BlobPrefix>
52+
</Blobs>
53+
</EnumerationResults>
54+
"""
55+
},
56+
)
57+
58+
copy_activity = next(activities)
59+
60+
assert copy_activity.name == "Copy files to Destination"
61+
assert copy_activity.type == "Copy"
62+
assert (
63+
copy_activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].result
64+
== "testfolder_1/$SourceBlobDelimiter"
65+
)
66+
67+
copy_activity = next(activities)
68+
assert copy_activity.name == "Copy files to Destination"
69+
assert copy_activity.type == "Copy"
70+
assert (
71+
copy_activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].result
72+
== "testfolder_2/$SourceBlobDelimiter"
73+
)
74+
75+
pytest.raises(StopIteration, lambda: next(activities))

0 commit comments

Comments
 (0)