Last active
March 10, 2017 19:21
-
-
Save mlongoria/7931f42c914afb8c85c4c85c1d7ec964 to your computer and use it in GitHub Desktop.
Biml ADF Pipeline Generation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<#@ import namespace="System.Data" #> | |
<#@ import namespace="System.Text" #> | |
<#@ property name="targetTables" type="DataView"#> | |
<#@ property name="frequency" type="string"#> | |
<#@ property name="scope" type="string"#> | |
{ | |
"$schema": "http://datafactories.schema.management.azure.com/schemas/2015-09-01/Microsoft.DataFactory.Pipeline.json", | |
"name": "PL_Copy_MySourceDBToADLS_<#=frequency#>_<#=scope#>", | |
"properties": { | |
"description": "<#=frequency#> <#=scope#> copies of data from Source db to the data lake.", | |
"activities": [ | |
<# var isFirst = true; foreach( DataRowView rowView in targetTables) {#> | |
<# DataRow row = rowView.Row; #> | |
<# string schemaName = row["SchemaName"].ToString();#> | |
<# string tableName = row["TableName"].ToString();#> | |
<# string columnList = row["ColumnListForSelect"].ToString(); #> | |
<# string predicate = row["IncrementalPredicate"].ToString(); #> | |
<#=isFirst ? "" : ","#> | |
{ | |
"name": "Copy to Lake - <#=schemaName#>.<#=tableName#>", | |
"type": "Copy", | |
"inputs": [ | |
{ | |
"name": "DS_OnPremSQL_MySourceDB_<#=schemaName#>_<#=tableName#>" | |
} | |
], | |
"outputs": [ | |
{ | |
"name": "DS_DataLake_MySourceDB_<#=schemaName#>_<#=tableName#>" | |
} | |
], | |
"typeProperties": { | |
"source": { | |
"type": "SqlSource", | |
<# if (scope == "Full") {#> | |
"sqlReaderQuery": "SELECT <#=columnList#>, SYSDATETIME() AS LoadDateTime FROM <#=schemaName#>.[<#=tableName#>]" | |
<#} else if (scope == "Deltas" && frequency == "Hourly") {#> | |
"sqlReaderQuery": "$$Text.Format('SELECT <#=columnList#>, SYSDATETIME() AS LoadDateTime FROM <#=schemaName#>.[<#=tableName#>] WHERE <#=predicate#>', Time.AddHours(WindowStart, -5), Time.AddHours(WindowEnd, -5))" | |
<#} else if (scope == "Deltas" && frequency == "Daily") {#> | |
"sqlReaderQuery": "$$Text.Format('SELECT <#=columnList#>, SYSDATETIME() AS LoadDateTime FROM <#=schemaName#>.[<#=tableName#>] WHERE <#=predicate#>', WindowStart, WindowEnd)" | |
<# } #> | |
}, | |
"sink": { | |
"type": "AzureDataLakeStoreSink" | |
} | |
}, | |
"policy": { | |
"concurrency": 1, | |
"executionPriorityOrder": "OldestFirst", | |
"retry": 3, | |
"timeout": "01:00:00" | |
}, | |
"scheduler": { | |
<# if (frequency == "Daily") {#> | |
"frequency": "Day", | |
"offset": "09:00:00", | |
<#} else if (frequency == "Hourly") {#> | |
"frequency": "Hour", | |
<# } #> | |
"style": "EndOfInterval", | |
"interval": 1 | |
} | |
} | |
<# isFirst = false; }#> | |
], | |
<# if (frequency == "Hourly") {#> | |
"start": "2017-03-01T01:00:00", | |
<#}else {#> | |
"start": "2017-03-02T00:00:00", | |
<#}#> | |
"end": "9999-09-09" | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<#@ template tier="10" #> | |
<#@ import namespace="System.Data" #> | |
<#@ import namespace="System.Text" #> | |
<#@ code file="BGHelper.cs" #> | |
<#@ import namespace="BGHelper" #> | |
<Biml xmlns="http://schemas.varigence.com/biml.xsd"> | |
</Biml> | |
<# | |
string mdFilePath = "C:\\Users\\admin\\Source\\Workspaces\\Data Warehouse\\metadata"; | |
string mdFileName = "TargetTableMetadata.xlsx"; | |
string mdWorkSheetName = "Metadata$"; | |
bool mdHasHeader = true; | |
string logPath = "C:\\Users\\admin\\Source\\Workspaces\\Data Warehouse\\data_factory\\generate_data_factory_biml\\log.txt"; | |
string adfProjPath = "C:\\Users\\admin\\Source\\Workspaces\\Data Warehouse\\data_factory\\data_factory\\"; | |
DataSet ds = new DataSet(); | |
ds = ExcelReader.ReadExcelQuery(mdFilePath, mdFileName, mdWorkSheetName, mdHasHeader); | |
System.IO.File.AppendAllText(@logPath, "MetaData File Path: " + System.IO.Path.Combine(mdFilePath, mdFileName).ToString() +Environment.NewLine ); | |
System.IO.File.AppendAllText(@logPath, "MetaData File Path: " + System.IO.File.Exists(System.IO.Path.Combine(mdFilePath, mdFileName)).ToString() +Environment.NewLine ); | |
System.IO.File.AppendAllText(@logPath, "Dataset table count: " + ds.Tables.Count.ToString() + Environment.NewLine); | |
DataView dailyFulls = new DataView(ds.Tables["Metadata"],"Frequency = 'Daily' and [Changes Only] = 'No'","", DataViewRowState.CurrentRows); | |
DataView dailyDeltas = new DataView(ds.Tables["Metadata"], "Frequency = 'Daily' and [Changes Only] = 'Yes'", "", DataViewRowState.CurrentRows); | |
DataView hourlyFulls = new DataView(ds.Tables["Metadata"], "Frequency = 'Hourly' and [Changes Only] = 'No'", "", DataViewRowState.CurrentRows); | |
DataView hourlyDeltas = new DataView(ds.Tables["Metadata"], "Frequency = 'Hourly' and [Changes Only] = 'Yes'", "", DataViewRowState.CurrentRows); | |
//log count of results for each filter | |
System.IO.File.AppendAllText(@logPath, "Daily Fulls Count: " + dailyFulls.Count.ToString() + Environment.NewLine); | |
System.IO.File.AppendAllText(@logPath, "Daily Deltas Count: " + dailyDeltas.Count.ToString() + Environment.NewLine); | |
System.IO.File.AppendAllText(@logPath, "Hourly Fulls Count: " + hourlyFulls.Count.ToString() + Environment.NewLine); | |
System.IO.File.AppendAllText(@logPath, "Hourly Deltas Count: " + hourlyDeltas.Count.ToString() + Environment.NewLine); | |
//Generate datasets | |
foreach (DataRowView rowView in dailyFulls) | |
{ | |
DataRow row = rowView.Row; | |
string schemaName = row["SchemaName"].ToString(); | |
string tableName = row["TableName"].ToString(); | |
string frequency = row["Frequency"].ToString(); | |
string scope = "full"; | |
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency)); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope)); | |
} | |
foreach (DataRowView rowView in dailyDeltas) | |
{ | |
DataRow row = rowView.Row; | |
string schemaName = row["SchemaName"].ToString(); | |
string tableName = row["TableName"].ToString(); | |
string frequency = row["Frequency"].ToString(); | |
string scope = "deltas"; | |
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency)); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope)); | |
} | |
foreach (DataRowView rowView in hourlyFulls) | |
{ | |
DataRow row = rowView.Row; | |
string schemaName = row["SchemaName"].ToString(); | |
string tableName = row["TableName"].ToString(); | |
string frequency = row["Frequency"].ToString(); | |
string scope = "full"; | |
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency)); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope)); | |
} | |
foreach (DataRowView rowView in hourlyDeltas) | |
{ | |
DataRow row = rowView.Row; | |
string schemaName = row["SchemaName"].ToString(); | |
string tableName = row["TableName"].ToString(); | |
string frequency = row["Frequency"].ToString(); | |
string scope = "deltas"; | |
System.IO.File.AppendAllText(@logPath, "DailyFulls | " + row["SchemaName"].ToString() + "." + row["TableName"].ToString() + " | " + row["ColumnListForSelect"].ToString() + Environment.NewLine); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_OnPremSQL_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_OnPremSQL_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency)); | |
System.IO.File.WriteAllText(@adfProjPath + "DS_DataLake_MySourceDB_" + schemaName + "_" + tableName + ".json", CallBimlScript("DS_DataLake_MySourceDB_Schema_Table.biml", schemaName, tableName, frequency, scope)); | |
} | |
// Generate pipelines | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Daily_Full.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", dailyFulls, "Daily", "Full")); | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Daily_Deltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", dailyDeltas, "Daily", "Deltas")); | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Hourly_Full.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", hourlyFulls, "Hourly", "Full")); | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_Hourly_Deltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS.biml", hourlyDeltas, "Hourly", "Deltas")); | |
//Generate One-Time Pipelines | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_DailyFulls.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", dailyFulls, "Daily", "Full")); | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_DailyDeltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", dailyDeltas, "Daily", "Deltas")); | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_HourlyFulls.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", hourlyFulls, "Hourly", "Full")); | |
System.IO.File.WriteAllText(@adfProjPath + "PL_Copy_MySourceDBToADLS_OneTime_HourlyDeltas.json", CallBimlScript("PL_Copy_MySourceDBToADLS_OneTime.biml", hourlyDeltas, "Hourly", "Deltas")); | |
#> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment