diff --git a/README.md b/README.md index 7912630..9460594 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ bash deploySynapse.sh - Serverless SQL Demo Data Database - Proper service and user permissions for Azure Synapse Analytics Workspace and Azure Data Lake Storage Gen2 - Parquet Auto Ingestion pipeline to optimize data ingestion using best practices +- Lake Database Auto DDL creation (views) for all files used by Ingestion pipeline # Other Files - You can find a Synapse_Dedicated_SQL_Pool_Test_Plan.jmx JMeter file under the artifacts folder that is configured to work with your recently deployed Synapse Environment. diff --git a/artifacts/Lake_Database_Auto_DDL.json.tmpl b/artifacts/Lake_Database_Auto_DDL.json.tmpl new file mode 100644 index 0000000..89eb3cf --- /dev/null +++ b/artifacts/Lake_Database_Auto_DDL.json.tmpl @@ -0,0 +1,417 @@ +{ + "name": "Lake Database Auto DDL", + "properties": { + "activities": [ + { + "name": "Get List of Tables", + "type": "Lookup", + "dependsOn": [], + "policy": { + "timeout": "7.00:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "source": { + "type": "SqlDWSource", + "sqlReaderQuery": { + "value": "@concat('SELECT *\nFROM OPENROWSET\n(\n BULK ''', pipeline().parameters.StorageAccountNameMetadata, '''\n ,FORMAT = ''CSV''\n ,PARSER_VERSION=''2.0''\n ,HEADER_ROW = TRUE\n) AS r\n')", + "type": "Expression" + }, + "queryTimeout": "24:00:00", + "partitionOption": "None" + }, + "dataset": { + "referenceName": "DS_Synapse_Managed_Identity", + "type": "DatasetReference", + "parameters": { + "ServerName": { + "value": "@concat(pipeline().DataFactory, '-ondemand.sql.azuresynapse.net')", + "type": "Expression" + }, + "DatabaseName": "master" + } + }, + "firstRowOnly": false + } + }, + { + "name": "ForEach Table", + "type": "ForEach", + "dependsOn": [ + { + "activity": "Get List of Tables", + "dependencyConditions": [ + "Succeeded" + ] + }, + { + "activity": "Set variable - PipelineValues", + "dependencyConditions": [ + "Succeeded" + ] + }, + { + "activity": "Create Log Tables If Not Exists", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "userProperties": [], + "typeProperties": { + "items": { + "value": "@activity('Get List of Tables').output.value", + "type": "Expression" + }, + "isSequential": false, + "batchCount": 50, + "activities": [ + { + "name": "Create Staging Table DDL - Serverless", + "type": "Lookup", + "dependsOn": [], + "policy": { + "timeout": "7.00:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "source": { + "type": "SqlDWSource", + "sqlReaderQuery": { + "value": "@replace(replace(replace(variables('SqlScript'), '{SchemaNameStaging}', item().SchemaNameTarget)\n, '{TableNameStaging}', item().TableNameTarget)\n, '{FolderPathFull}', item().FolderPathFull) ", + "type": "Expression" + }, + "queryTimeout": "24:00:00", + "partitionOption": "None" + }, + "dataset": { + "referenceName": "DS_Synapse_Managed_Identity", + "type": "DatasetReference", + "parameters": { + "ServerName": { + "value": "@concat(pipeline().DataFactory, '-ondemand.sql.azuresynapse.net')", + "type": "Expression" + }, + "DatabaseName": "master" + } + }, + "firstRowOnly": true + } + }, + { + "name": "Log DDL", + "type": "Lookup", + "dependsOn": [ + { + "activity": "Create Staging Table DDL - Serverless", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "timeout": "7.00:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "source": { + "type": "SqlDWSource", + "sqlReaderQuery": { + "value": "@concat('\nDECLARE @CurrentDateTime DATETIME2(0) = GETDATE() \n\nINSERT INTO logging.ServerlessDDL VALUES \n(''', variables('PipelineValues')[0].PipelineRunId, ''' /* PipelineRunId */\n,''', pipeline().Pipeline, ''' /* PipelineName */\n,''', variables('PipelineValues')[0].PipelineStartDate, ''' /* PipelineStartDate */\n,''', variables('PipelineValues')[0].PipelineStartDateTime, ''' /* PipelineStartDateTime */\n,''', pipeline().parameters.StorageAccountNameMetadata, ''' /* StorageAccountNameMetadata */\n,''', item().FolderPathFull, ''' /* FolderPathFull */\n,''', activity('Create Staging Table DDL - Serverless').output.firstRow.SchemaName, ''' /* SchemaName */\n,''', activity('Create Staging Table DDL - Serverless').output.firstRow.TableName, ''' /* TableName */\n,''', replace(activity('Create Staging Table DDL - Serverless').output.firstRow.CreateTableDDL, '''', ''''''), ''' /* SqlCommandCreateExternalTable */\n,''', replace(activity('Create Staging Table DDL - Serverless').output.firstRow.CreateTableStatsDDL, '''', ''''''), ''' /* SqlCommandCreateExternalTableStats */\n,''', replace(activity('Create Staging Table DDL - Serverless').output.firstRow.CreateViewDDL, '''', ''''''), ''' /* SqlCommandCreateView */\n,''', replace(activity('Create Staging Table DDL - Serverless').output.firstRow.CreateViewStatsDDL, '''', ''''''), ''' /* SqlCommandCreateViewStats */\n,@CurrentDateTime /* RowInsertDateTime */\n)\n;\nSELECT 1 AS a\n;\n')", + "type": "Expression" + }, + "queryTimeout": "24:00:00", + "partitionOption": "None" + }, + "dataset": { + "referenceName": "DS_Synapse_Managed_Identity", + "type": "DatasetReference", + "parameters": { + "ServerName": { + "value": "@concat(pipeline().DataFactory, '.sql.azuresynapse.net')", + "type": "Expression" + }, + "DatabaseName": { + "value": "@variables('DatabaseName')", + "type": "Expression" + } + } + }, + "firstRowOnly": false + } + }, + { + "name": "Execute Create External Table DDL", + "type": "Lookup", + "dependsOn": [ + { + "activity": "Create Staging Table DDL - Serverless", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "timeout": "7.00:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "source": { + "type": "SqlDWSource", + "sqlReaderQuery": { + "value": "@concat(\nactivity('Create Staging Table DDL - Serverless').output.firstRow.CreateTableDDL\n,'SELECT 1 AS a;'\n)", + "type": "Expression" + }, + "queryTimeout": "24:00:00", + "partitionOption": "None" + }, + "dataset": { + "referenceName": "DS_Synapse_Managed_Identity", + "type": "DatasetReference", + "parameters": { + "ServerName": { + "value": "@concat(pipeline().DataFactory, '-ondemand.sql.azuresynapse.net')", + "type": "Expression" + }, + "DatabaseName": { + "value": "@variables('LakeDatabaseName')", + "type": "Expression" + } + } + }, + "firstRowOnly": false + } + }, + { + "name": "Execute Create Stats on External Table DDL", + "type": "Lookup", + "dependsOn": [ + { + "activity": "Execute Create External Table DDL", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "timeout": "7.00:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "source": { + "type": "SqlDWSource", + "sqlReaderQuery": { + "value": "@concat(\nactivity('Create Staging Table DDL - Serverless').output.firstRow.CreateTableStatsDDL\n,'SELECT 1 AS a;'\n)", + "type": "Expression" + }, + "queryTimeout": "24:00:00", + "partitionOption": "None" + }, + "dataset": { + "referenceName": "DS_Synapse_Managed_Identity", + "type": "DatasetReference", + "parameters": { + "ServerName": { + "value": "@concat(pipeline().DataFactory, '-ondemand.sql.azuresynapse.net')", + "type": "Expression" + }, + "DatabaseName": { + "value": "@variables('LakeDatabaseName')", + "type": "Expression" + } + } + }, + "firstRowOnly": false + } + }, + { + "name": "Execute Create View DDL", + "type": "Lookup", + "dependsOn": [ + { + "activity": "Create Staging Table DDL - Serverless", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "timeout": "7.00:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "source": { + "type": "SqlDWSource", + "sqlReaderQuery": { + "value": "@concat(\nactivity('Create Staging Table DDL - Serverless').output.firstRow.CreateViewDDL\n,'SELECT 1 AS a;'\n)", + "type": "Expression" + }, + "queryTimeout": "24:00:00", + "partitionOption": "None" + }, + "dataset": { + "referenceName": "DS_Synapse_Managed_Identity", + "type": "DatasetReference", + "parameters": { + "ServerName": { + "value": "@concat(pipeline().DataFactory, '-ondemand.sql.azuresynapse.net')", + "type": "Expression" + }, + "DatabaseName": { + "value": "@variables('LakeDatabaseName')", + "type": "Expression" + } + } + }, + "firstRowOnly": false + } + }, + { + "name": "Execute Create View Stats DDL", + "type": "Lookup", + "dependsOn": [ + { + "activity": "Execute Create View DDL", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "timeout": "7.00:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "source": { + "type": "SqlDWSource", + "sqlReaderQuery": { + "value": "@concat(\nactivity('Create Staging Table DDL - Serverless').output.firstRow.CreateViewStatsDDL\n,'SELECT 1 AS a;'\n)", + "type": "Expression" + }, + "queryTimeout": "24:00:00", + "partitionOption": "None" + }, + "dataset": { + "referenceName": "DS_Synapse_Managed_Identity", + "type": "DatasetReference", + "parameters": { + "ServerName": { + "value": "@concat(pipeline().DataFactory, '-ondemand.sql.azuresynapse.net')", + "type": "Expression" + }, + "DatabaseName": { + "value": "@variables('LakeDatabaseName')", + "type": "Expression" + } + } + }, + "firstRowOnly": false + } + } + ] + } + }, + { + "name": "Set variable - PipelineValues", + "type": "SetVariable", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "variableName": "PipelineValues", + "value": { + "value": "@array(json(concat('{\n\"PipelineRunId\": \"', pipeline().RunId ,'\"'\n,',\"PipelineStartDate\": \"', formatDateTime(convertFromUtc(pipeline().TriggerTime, 'Eastern Standard Time'), 'yyyyMMdd'), '\"'\n,',\"PipelineStartDateTime\": \"', formatDateTime(convertFromUtc(pipeline().TriggerTime, 'Eastern Standard Time'), 'yyyy-MM-dd HH:mm:ss'), '\"'\n,'}')))", + "type": "Expression" + } + } + }, + { + "name": "Create Log Tables If Not Exists", + "type": "Lookup", + "dependsOn": [], + "policy": { + "timeout": "7.00:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "source": { + "type": "SqlDWSource", + "sqlReaderQuery": "IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE [name] = 'logging')\n EXEC ('CREATE SCHEMA [logging]')\n;\n\nIF OBJECT_ID('logging.ServerlessDDL', 'U') IS NOT NULL\n DROP TABLE logging.ServerlessDDL\n;\n\nCREATE TABLE logging.ServerlessDDL\n(\n\tId INT IDENTITY(1,1) NOT NULL\n\t,PipelineRunId VARCHAR(50) NOT NULL\n ,PipelineName VARCHAR(100) NOT NULL\n\t,PipelineStartDate INT NOT NULL\n\t,PipelineStartDateTime DATETIME2(0) NOT NULL\n ,StorageAccountNameMetadata NVARCHAR(1000) NULL\n\t,FolderPathFull NVARCHAR(4000) NOT NULL\n\t,SchemaName NVARCHAR(100) NOT NULL\n ,TableName NVARCHAR(100) NOT NULL\n\t,SqlCommandCreateExternalTable NVARCHAR(MAX) NOT NULL\n\t,SqlCommandCreateExternalTableStats NVARCHAR(MAX) NOT NULL\n\t,SqlCommandCreateView NVARCHAR(MAX) NOT NULL\n\t,SqlCommandCreateViewStats NVARCHAR(MAX) NOT NULL\n ,RowInsertDateTime DATETIME2(0) NOT NULL\n)\nWITH (DISTRIBUTION = ROUND_ROBIN, CLUSTERED INDEX(PipelineStartDate, Id)\n)\n;\nSELECT 1 AS a", + "queryTimeout": "24:00:00", + "partitionOption": "None" + }, + "dataset": { + "referenceName": "DS_Synapse_Managed_Identity", + "type": "DatasetReference", + "parameters": { + "ServerName": { + "value": "@concat(pipeline().DataFactory, '.sql.azuresynapse.net')", + "type": "Expression" + }, + "DatabaseName": { + "value": "@variables('DatabaseName')", + "type": "Expression" + } + } + }, + "firstRowOnly": false + } + } + ], + "parameters": { + "StorageAccountNameMetadata": { + "type": "string", + "defaultValue": "https://REPLACE_DATALAKE_NAME.dfs.core.windows.net/curated/metadata-table-AdventureWorksDW-Trimmed-Minimal.csv" + } + }, + "variables": { + "PipelineValues": { + "type": "Array" + }, + "SqlScript": { + "type": "String", + "defaultValue": "IF OBJECT_ID('tempdb..#tables') IS NOT NULL DROP TABLE #tables; CREATE TABLE #tables ( SchemaName NVARCHAR(100) , TableName NVARCHAR(100) , FolderPath NVARCHAR(1000) ); INSERT INTO #tables VALUES ( '{SchemaNameStaging}' , '{TableNameStaging}' , '{FolderPathFull}' ); IF OBJECT_ID('tempdb..#CreateViewsDDL') IS NOT NULL DROP TABLE #CreateViewsDDL; CREATE TABLE #CreateViewsDDL ( SchemaName NVARCHAR(100) , ViewName NVARCHAR(100) , ViewDDL NVARCHAR(MAX) ); DECLARE @cnt INT = 1 DECLARE @sqlCreateView NVARCHAR(MAX) DECLARE @SchemaName NVARCHAR(100) DECLARE @TableName NVARCHAR(100) DECLARE @FolderPath NVARCHAR(1000); SELECT @SchemaName = SchemaName , @TableName = TableName , @FolderPath = FolderPath , @sqlCreateView = CONCAT ( 'sp_describe_first_result_set @tsql=N''SELECT * FROM OPENROWSET(BULK ''''' , FolderPath , ''''' , FORMAT=''''PARQUET'''') AS r''' ) FROM #tables; IF OBJECT_ID('tempdb..#InformationSchemaTempTable', 'U') IS NOT NULL DROP TABLE #InformationSchemaTempTable; CREATE TABLE #InformationSchemaTempTable ( is_hidden BIT NOT NULL , column_ordinal INT NOT NULL , name SYSNAME NULL , is_nullable BIT NOT NULL , system_type_id INT NOT NULL , system_type_name NVARCHAR(256) NULL , max_length SMALLINT NOT NULL , precision TINYINT NOT NULL , scale TINYINT NOT NULL , collation_name SYSNAME NULL , user_type_id INT NULL , user_type_database SYSNAME NULL , user_type_schema SYSNAME NULL , user_type_name SYSNAME NULL , assembly_qualified_type_name NVARCHAR(4000) , xml_collection_id INT NULL , xml_collection_database SYSNAME NULL , xml_collection_schema SYSNAME NULL , xml_collection_name SYSNAME NULL , is_xml_document BIT NOT NULL , is_case_sensitive BIT NOT NULL , is_fixed_length_clr_type BIT NOT NULL , source_server SYSNAME NULL , source_database SYSNAME NULL , source_schema SYSNAME NULL , source_table SYSNAME NULL , source_column SYSNAME NULL , is_identity_column BIT NULL , is_part_of_unique_key BIT NULL , is_updateable BIT NULL , is_computed_column BIT NULL , is_sparse_column_set BIT NULL , ordinal_in_order_by_list SMALLINT NULL , order_by_list_length SMALLINT NULL , order_by_is_descending SMALLINT NULL , tds_type_id INT NOT NULL , tds_length INT NOT NULL , tds_collation_id INT NULL , tds_collation_sort_id TINYINT NULL ); INSERT INTO #InformationSchemaTempTable EXEC (@sqlCreateView) /*SELECT * FROM #InformationSchemaTempTable*/ DECLARE @GetMaxValueStatement NVARCHAR(MAX) DECLARE @GetColumnList NVARCHAR(MAX) SELECT @GetMaxValueStatement = CONVERT(NVARCHAR(MAX), CONCAT ( 'SELECT ' , STRING_AGG(ColumnMaxLength, ',') , ' FROM OPENROWSET(BULK ''' , @FolderPath , ''' , FORMAT=''PARQUET'') WITH (' , STRING_AGG(CONVERT(NVARCHAR(MAX), ColumnDatatypeWithMax), ',') , ') AS r' )) , @GetColumnList = STRING_AGG(QUOTENAME([name]), ',') FROM ( SELECT CASE WHEN system_type_name LIKE ('%char%') OR system_type_name = 'varbinary(8000)' THEN CONCAT ( 'CONVERT(BIGINT, COALESCE(NULLIF(MAX(DATALENGTH(' , QUOTENAME([name]) , ')), 0), 1)) AS ' , QUOTENAME([name]) ) ELSE CONCAT ( 'COALESCE(CONVERT(BIGINT, SUM(0)), 0) AS ' , QUOTENAME([name]) ) END AS ColumnMaxLength , CASE WHEN system_type_name LIKE ('%char%') THEN CONCAT ( QUOTENAME([name]) , ' ' , REPLACE(system_type_name, '8000', 'MAX') , ' COLLATE Latin1_General_100_BIN2_UTF8' ) WHEN system_type_name = 'varbinary(8000)' THEN CONCAT ( QUOTENAME([name]) , ' ' , REPLACE(system_type_name, '8000', 'MAX') ) ELSE CONCAT ( QUOTENAME([name]) , ' ' , system_type_name ) END AS ColumnDatatypeWithMax , [name] FROM #InformationSchemaTempTable ) AS a /*SELECT @GetMaxValueStatement*/ /*SELECT @GetColumnList*/ DECLARE @sqlUnpivot NVARCHAR(MAX) SET @sqlUnpivot = CONCAT ( 'SELECT ''' , @TableName , ''' AS TABLE_NAME, unpvt.col AS COLUMN_NAME, CASE WHEN unpvt.datatype > 8000 THEN ''MAX'' ELSE CONVERT(NVARCHAR(100), unpvt.datatype) END AS DATATYPE_MAX FROM ( ' , @GetMaxValueStatement , ' ) AS a ' , CHAR(13) , ' UNPIVOT ( datatype FOR col IN ( ' , @GetColumnList , ') ) AS unpvt' ) DROP TABLE IF EXISTS #tmpBus; CREATE TABLE #tmpBus ( TABLE_CLEAN NVARCHAR(1000) , COLUMN_NAME NVARCHAR(1000) , DATATYPE_MAX NVARCHAR(1000) ); INSERT INTO #tmpBus EXEC (@sqlUnpivot) DROP TABLE IF EXISTS #tmpFinal; CREATE TABLE #tmpFinal ( table_name NVARCHAR(1000) , column_name NVARCHAR(1000) , DataType NVARCHAR(1000) , ColumnFullDefinition NVARCHAR(1000) ); INSERT INTO #tmpFinal SELECT @TableName AS table_name , c.[name] AS column_name , UPPER(TYPE_NAME(c.system_type_id)) AS DataType , CONCAT ( c.[name] , ' ' , CASE WHEN TYPE_NAME(c.system_type_id) IN ('int', 'bigint', 'smallint', 'tinyint', 'bit', 'decimal', 'numeric', 'float', 'real', 'datetime2', 'date') THEN UPPER(c.system_type_name) ELSE CONCAT ( UPPER(TYPE_NAME(c.system_type_id)) , '(' , a.DATATYPE_MAX , ') COLLATE Latin1_General_100_BIN2_UTF8' ) END ) AS ColumnFullDefinition FROM #InformationSchemaTempTable AS c JOIN #tmpBus AS a ON a.COLUMN_NAME = c.[name] ORDER BY column_ordinal OFFSET 0 ROWS; DECLARE @createTableDDL NVARCHAR(MAX) DECLARE @createTableStatsDDL NVARCHAR(MAX) DECLARE @createViewDDL NVARCHAR(MAX) DECLARE @createViewStatsDDL NVARCHAR(MAX) DECLARE @openrowsetValue NVARCHAR(MAX) DECLARE @DataSourceName NVARCHAR(MAX) = ( SELECT CONCAT ( 'ds_' , SUBSTRING(FolderPath, CHARINDEX('//', FolderPath) + 2, (CHARINDEX('.', FolderPath) - 9)) ) FROM #tables ) DECLARE @DataSourceDefinition NVARCHAR(MAX) = ( SELECT SUBSTRING(FolderPath, 0, CHARINDEX('/', REPLACE(FolderPath, '//', '')) + 2) FROM #tables ) DECLARE @DataSourcePath NVARCHAR(MAX) = ( SELECT SUBSTRING(FolderPath, CHARINDEX('/', REPLACE(FolderPath, '//', '')) + 2, LEN(FolderPath)) FROM #tables ) DECLARE @DataSourceCreateDDL NVARCHAR(MAX) = ( SELECT CONCAT ( 'IF NOT EXISTS (SELECT * FROM sys.external_data_sources WHERE name = ''' , @DataSourceName , ''') CREATE EXTERNAL DATA SOURCE [' , @DataSourceName , '] WITH (LOCATION = ''' , @DataSourceDefinition , ''')' , '' ) ) DECLARE @FileFormatCreateDDL NVARCHAR(MAX) = 'IF NOT EXISTS (SELECT * FROM sys.external_file_formats WHERE name = ''SynapseParquetFormat'') CREATE EXTERNAL FILE FORMAT [SynapseParquetFormat] WITH ( FORMAT_TYPE = PARQUET)' DECLARE @CreateSchema NVARCHAR(MAX) = ( SELECT CONCAT ( 'IF NOT EXISTS(SELECT 1 FROM sys.schemas WHERE [name] = ''' , @SchemaName , ''') EXEC(''CREATE SCHEMA ' , QUOTENAME(@SchemaName) , ''');' ) ) SELECT @createTableDDL = CONCAT ( 'CREATE EXTERNAL TABLE ' , QUOTENAME(@SchemaName) , '.' , QUOTENAME(@TableName) , ' (' , STRING_AGG(ColumnFullDefinition, ',') , ') WITH ( LOCATION = ''' , @DataSourcePath , ''', DATA_SOURCE = [' , @DataSourceName , '], FILE_FORMAT = [SynapseParquetFormat])' ) , @createTableStatsDDL = STRING_AGG(CONCAT ( 'CREATE STATISTICS stat_' , column_name , ' ON ' , QUOTENAME(@Schemaname) , '.' , QUOTENAME(@TableName) , ' (' , QUOTENAME(column_name) , ') WITH FULLSCAN, NORECOMPUTE' ), ';') , @createViewDDL = CONCAT ( 'CREATE VIEW ' , QUOTENAME(@SchemaName) , '.[vw' , @TableName , '] AS SELECT * FROM OPENROWSET(BULK ''' , @FolderPath , ''' , FORMAT=''PARQUET'') WITH (' , STRING_AGG(ColumnFullDefinition, ',') , ') AS r' ) , @openrowsetValue = CONCAT ( 'FROM OPENROWSET(BULK ''''' , @FolderPath , ''''', FORMAT=''''PARQUET'''') WITH (' , STRING_AGG(CONVERT(NVARCHAR(MAX), ColumnFullDefinition), ',') ) FROM #tmpFinal; SELECT @createViewStatsDDL = STRING_AGG(CONCAT ( 'EXEC sys.sp_create_openrowset_statistics N''SELECT ' , column_name , ' ' , @openrowsetValue , ') AS r''' ), ';') FROM #tmpFinal; SELECT @SchemaName AS SchemaName , @TableName AS TableName , CONCAT ( @FileFormatCreateDDL , ';' , @DataSourceCreateDDL , ';' , @CreateSchema , ' IF EXISTS(SELECT 1 FROM INFORMATION_SCHEMA.tables WHERE TABLE_SCHEMA = ''' , @SchemaName , ''' AND TABLE_NAME = ''' , @TableName , ''') DROP EXTERNAL TABLE ' , @SchemaName , '.' , @TableName , '; ' , @createTableDDL , ';' ) AS CreateTableDDL , @createTableStatsDDL AS CreateTableStatsDDL , CONCAT ( @CreateSchema , ' IF OBJECT_ID(''' , @SchemaName , '.vw' , @TableName , ''', ''V'') IS NOT NULL DROP VIEW ' , @SchemaName , '.vw' , @TableName , '; EXEC(''' , REPLACE(@createViewDDL, '''', '''''') , ''');' ) AS CreateViewDDL , @createViewStatsDDL AS CreateViewStatsDDL;" + }, + "DatabaseName": { + "type": "String", + "defaultValue": "DataWarehouse" + }, + "LakeDatabaseName": { + "type": "String", + "defaultValue": "Demo Data (Serverless)" + } + }, + "annotations": [] + } +} \ No newline at end of file diff --git a/artifacts/Parquet_Auto_Ingestion.json.tmpl b/artifacts/Parquet_Auto_Ingestion.json.tmpl index b576b5a..66580db 100644 --- a/artifacts/Parquet_Auto_Ingestion.json.tmpl +++ b/artifacts/Parquet_Auto_Ingestion.json.tmpl @@ -8,9 +8,9 @@ "dependsOn": [], "userProperties": [], "typeProperties": { - "variableName": "MasterPipelineValues", + "variableName": "PipelineValues", "value": { - "value": "@array(json(concat('{\n\"MasterPipelineId\": \"', pipeline().RunId ,'\"'\n,',\"MasterPipelineStartDate\": \"', formatDateTime(convertFromUtc(pipeline().TriggerTime, 'Eastern Standard Time'), 'yyyyMMdd'), '\"'\n,',\"MasterPipelineStartDateTime\": \"', formatDateTime(convertFromUtc(pipeline().TriggerTime, 'Eastern Standard Time'), 'yyyy-MM-dd HH:mm:ss'), '\"'\n,'}')))", + "value": "@array(json(concat('{\n\"PipelineRunId\": \"', pipeline().RunId ,'\"'\n,',\"PipelineStartDate\": \"', formatDateTime(convertFromUtc(pipeline().TriggerTime, 'Eastern Standard Time'), 'yyyyMMdd'), '\"'\n,',\"PipelineStartDateTime\": \"', formatDateTime(convertFromUtc(pipeline().TriggerTime, 'Eastern Standard Time'), 'yyyy-MM-dd HH:mm:ss'), '\"'\n,'}')))", "type": "Expression" } } @@ -53,7 +53,7 @@ "value": "@CONCAT('EXECUTE AS user = ''Userstaticrc10'' \nIF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE [name] = ''', item().SchemaNameStaging, ''') EXEC(''CREATE SCHEMA ', item().SchemaNameStaging, ''');SELECT 1 AS a')", "type": "Expression" }, - "queryTimeout": "02:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -89,10 +89,10 @@ "source": { "type": "SqlDWSource", "sqlReaderQuery": { - "value": "@concat('\nEXECUTE AS user = ''Userstaticrc10'' \n\nDECLARE @CurrentDateTime DATETIME2(0) = GETDATE() \n\nINSERT INTO logging.AutoIngestion VALUES \n(''', variables('MasterPipelineValues')[0].MasterPipelineId, ''' /* MasterPipelineId */\n,''',variables('MasterPipelineValues')[0].MasterPipelineStartDate, ''' /* MasterPipelineStartDate */\n,''', variables('MasterPipelineValues')[0].MasterPipelineStartDateTime, ''' /* MasterPipelineStartDateTime */\n,''', pipeline().parameters.StorageAccountNameMetadata, ''' /* StorageAccountNameMetadata */\n,''', pipeline().RunId, ''' /* PipelineRunId */\n,''', pipeline().Pipeline, ''' /* PipelineName */\n\n,NULL /* SchemaNameStaging */\n,NULL /* TableNameStaging */\n,NULL /* FolderPathFull */\n,''', item().SchemaNameStaging, ''' /* SchemaNameTarget */\n,NULL /* TableNameTarget */\n,NULL /* TableDistributionTarget */\n,NULL /* TableIndexTarget */\n\n,''Create Schema'' /* ActivityType */\n,''', replace(CONCAT('EXECUTE AS user = ''Userstaticrc10'' IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE [name] = ''', item().SchemaNameStaging, ''') EXEC(''CREATE SCHEMA ', item().SchemaNameStaging, ''');SELECT 1 AS a'), '''', ''''''), ''' /* SqlCommand */\n,@CurrentDateTime /* RowInsertDateTime */\n)\n;\nSELECT 1 AS a\n;\n')", + "value": "@concat('\nEXECUTE AS user = ''Userstaticrc10'' \n\nDECLARE @CurrentDateTime DATETIME2(0) = GETDATE() \n\nINSERT INTO logging.AutoIngestion VALUES \n(''', variables('PipelineValues')[0].PipelineRunId, ''' /* PipelineRunId */\n,''', pipeline().Pipeline, ''' /* PipelineName */\n,''',variables('PipelineValues')[0].PipelineStartDate, ''' /* PipelineStartDate */\n,''', variables('PipelineValues')[0].PipelineStartDateTime, ''' /* PipelineStartDateTime */\n,''', pipeline().parameters.StorageAccountNameMetadata, ''' /* StorageAccountNameMetadata */\n\n,NULL /* SchemaNameStaging */\n,NULL /* TableNameStaging */\n,''', item().FolderPathFull, ''' /* FolderPathFull */\n,''', item().SchemaNameStaging, ''' /* SchemaNameTarget */\n,NULL /* TableNameTarget */\n,NULL /* TableDistributionTarget */\n,NULL /* TableIndexTarget */\n\n,''Create Schema'' /* ActivityType */\n,''', replace(CONCAT('EXECUTE AS user = ''Userstaticrc10'' IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE [name] = ''', item().SchemaNameStaging, ''') EXEC(''CREATE SCHEMA ', item().SchemaNameStaging, ''');'), '''', ''''''), ''' /* SqlCommand */\n,@CurrentDateTime /* RowInsertDateTime */\n)\n;\nSELECT 1 AS a\n;\n')", "type": "Expression" }, - "queryTimeout": "02:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -158,7 +158,7 @@ "value": "@replace(replace(replace(variables('SqlCommandCreateStagingTable'), '{SchemaNameStaging}', item().SchemaNameStaging)\n, '{TableNameStaging}', item().TableNameStaging)\n, '{FolderPathFull}', item().FolderPathFull) ", "type": "Expression" }, - "queryTimeout": "02:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -198,10 +198,10 @@ "source": { "type": "SqlDWSource", "sqlReaderQuery": { - "value": "@concat('EXECUTE AS user = ''Userstaticrc10'' ', activity('Create Staging Table DDL - Serverless').output.value[0].CreateTableDDL)", + "value": "@concat('EXECUTE AS user = ''Userstaticrc10'' ', activity('Create Staging Table DDL - Serverless').output.value[0].CreateTableDDL, '; SELECT 1 AS a')", "type": "Expression" }, - "queryTimeout": "02:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -244,10 +244,10 @@ "source": { "type": "SqlDWSource", "sqlReaderQuery": { - "value": "@concat('\nEXECUTE AS user = ''Userstaticrc10'' \n\nDECLARE @CurrentDateTime DATETIME2(0) = GETDATE() \n\nINSERT INTO logging.AutoIngestion VALUES \n(''', variables('MasterPipelineValues')[0].MasterPipelineId, ''' /* MasterPipelineId */\n,''', variables('MasterPipelineValues')[0].MasterPipelineStartDate, ''' /* MasterPipelineStartDate */\n,''', variables('MasterPipelineValues')[0].MasterPipelineStartDateTime, ''' /* MasterPipelineStartDateTime */\n,''', pipeline().parameters.StorageAccountNameMetadata, ''' /* StorageAccountNameMetadata */\n,''', pipeline().GroupId, ''' /* PipelineRunId */\n,''', pipeline().Pipeline, ''' /* PipelineName */\n\n,''', item().SchemaNameStaging, ''' /* SchemaNameStaging */\n,''', item().TableNameStaging, ''' /* TableNameStaging */\n,''', item().FolderPathFull, ''' /* FolderPathFull */\n,NULL /* SchemaNameTarget */\n,NULL /* TableNameTarget */\n,NULL /* TableDistributionTarget */\n,NULL /* TableIndexTarget */\n\n,''Create Staging Table'' /* ActivityType */\n,''', replace(concat('EXECUTE AS user = ''Userstaticrc10'' ', activity('Create Staging Table DDL - Serverless').output.value[0].CreateTableDDL), '''', ''''''), ''' /* SqlCommand */\n,@CurrentDateTime /* RowInsertDateTime */\n)\n;\nSELECT 1 AS a\n;\n')", + "value": "@concat('\nEXECUTE AS user = ''Userstaticrc10'' \n\nDECLARE @CurrentDateTime DATETIME2(0) = GETDATE() \n\nINSERT INTO logging.AutoIngestion VALUES \n(''', variables('PipelineValues')[0].PipelineRunId, ''' /* PipelineRunId */\n,''', pipeline().Pipeline, ''' /* PipelineName */\n,''',variables('PipelineValues')[0].PipelineStartDate, ''' /* PipelineStartDate */\n,''', variables('PipelineValues')[0].PipelineStartDateTime, ''' /* PipelineStartDateTime */\n,''', pipeline().parameters.StorageAccountNameMetadata, ''' /* StorageAccountNameMetadata */\n\n,''', item().SchemaNameStaging, ''' /* SchemaNameStaging */\n,''', item().TableNameStaging, ''' /* TableNameStaging */\n,''', item().FolderPathFull, ''' /* FolderPathFull */\n,NULL /* SchemaNameTarget */\n,NULL /* TableNameTarget */\n,NULL /* TableDistributionTarget */\n,NULL /* TableIndexTarget */\n\n,''Create Staging Table'' /* ActivityType */\n,''', replace(concat('EXECUTE AS user = ''Userstaticrc10'' ', activity('Create Staging Table DDL - Serverless').output.value[0].CreateTableDDL), '''', ''''''), ''' /* SqlCommand */\n,@CurrentDateTime /* RowInsertDateTime */\n)\n;\nSELECT 1 AS a\n;\n')", "type": "Expression" }, - "queryTimeout": "02:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -290,10 +290,10 @@ "source": { "type": "SqlDWSource", "sqlReaderQuery": { - "value": "@CONCAT('EXECUTE AS user = ''Userstaticrc10'' COPY INTO [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] FROM ''', item().FolderPathFull, ''' WITH ( FILE_TYPE = ''PARQUET'', MAXERRORS = 0, COMPRESSION = ''snappy'', IDENTITY_INSERT = ''OFF'', CREDENTIAL = (IDENTITY = ''Managed Identity'')) OPTION (LABEL = ''COPY - [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] - ', variables('MasterPipelineValues')[0].MasterPipelineId, ''');SELECT 1 AS a')", + "value": "@CONCAT('EXECUTE AS user = ''Userstaticrc10'' COPY INTO [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] FROM ''', item().FolderPathFull, ''' WITH ( FILE_TYPE = ''PARQUET'', MAXERRORS = 0, COMPRESSION = ''snappy'', IDENTITY_INSERT = ''OFF'', CREDENTIAL = (IDENTITY = ''Managed Identity'')) OPTION (LABEL = ''COPY - [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] - ', variables('PipelineValues')[0].PipelineRunId, ''');SELECT 1 AS a')\n", "type": "Expression" }, - "queryTimeout": "12:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -336,10 +336,10 @@ "source": { "type": "SqlDWSource", "sqlReaderQuery": { - "value": "@concat('\nDECLARE @CurrentDateTime DATETIME2(0) = GETDATE() \n\nINSERT INTO logging.AutoIngestion VALUES \n(''', variables('MasterPipelineValues')[0].MasterPipelineId, ''' /* MasterPipelineId */\n,''', variables('MasterPipelineValues')[0].MasterPipelineStartDate, ''' /* MasterPipelineStartDate */\n,''', variables('MasterPipelineValues')[0].MasterPipelineStartDateTime, ''' /* MasterPipelineStartDateTime */\n,''', pipeline().parameters.StorageAccountNameMetadata, ''' /* StorageAccountNameMetadata */\n,''', pipeline().GroupId, ''' /* PipelineRunId */\n,''', pipeline().Pipeline, ''' /* PipelineName */\n\n,''', item().SchemaNameStaging, ''' /* SchemaNameStaging */\n,''', item().TableNameStaging, ''' /* TableNameStaging */\n,''', item().FolderPathFull, ''' /* FolderPathFull */\n,NULL /* SchemaNameTarget */\n,NULL /* TableNameTarget */\n,NULL /* TableDistributionTarget */\n,NULL /* TableIndexTarget */\n\n,''Copy Into Staging Table'' /* ActivityType */\n,''', replace(CONCAT('EXECUTE AS user = ''Userstaticrc10'' COPY INTO [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] FROM ''', item().FolderPathFull, ''' WITH ( FILE_TYPE = ''PARQUET'', MAXERRORS = 0, COMPRESSION = ''snappy'', IDENTITY_INSERT = ''OFF'', CREDENTIAL = (IDENTITY = ''Managed Identity''));SELECT 1 AS a'), '''', ''''''), ''' /* SqlCommand */\n,@CurrentDateTime /* RowInsertDateTime */\n)\n;\nSELECT 1 AS a\n;\n')", + "value": "@concat('\nDECLARE @CurrentDateTime DATETIME2(0) = GETDATE() \n\nINSERT INTO logging.AutoIngestion VALUES \n(''', variables('PipelineValues')[0].PipelineRunId, ''' /* PipelineRunId */\n,''', pipeline().Pipeline, ''' /* PipelineName */\n,''',variables('PipelineValues')[0].PipelineStartDate, ''' /* PipelineStartDate */\n,''', variables('PipelineValues')[0].PipelineStartDateTime, ''' /* PipelineStartDateTime */\n,''', pipeline().parameters.StorageAccountNameMetadata, ''' /* StorageAccountNameMetadata */\n\n,''', item().SchemaNameStaging, ''' /* SchemaNameStaging */\n,''', item().TableNameStaging, ''' /* TableNameStaging */\n,''', item().FolderPathFull, ''' /* FolderPathFull */\n,NULL /* SchemaNameTarget */\n,NULL /* TableNameTarget */\n,NULL /* TableDistributionTarget */\n,NULL /* TableIndexTarget */\n\n,''Copy Into Staging Table'' /* ActivityType */\n,''', replace(CONCAT('EXECUTE AS user = ''Userstaticrc10'' COPY INTO [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] FROM ''', item().FolderPathFull, ''' WITH ( FILE_TYPE = ''PARQUET'', MAXERRORS = 0, COMPRESSION = ''snappy'', IDENTITY_INSERT = ''OFF'', CREDENTIAL = (IDENTITY = ''Managed Identity''));'), '''', ''''''), ''' /* SqlCommand */\n,@CurrentDateTime /* RowInsertDateTime */\n)\n;\nSELECT 1 AS a\n;\n')", "type": "Expression" }, - "queryTimeout": "02:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -382,10 +382,10 @@ "source": { "type": "SqlDWSource", "sqlReaderQuery": { - "value": "@concat(replace(replace(replace(variables('GetResourceClass')\n, '{SchemaNameStaging}', item().SchemaNameStaging)\n, '{TableNameStaging}', item().TableNameStaging)\n, '{MasterPipelineId}', variables('MasterPipelineValues')[0].MasterPipelineId)\n)\n", + "value": "@concat(replace(replace(replace(variables('GetResourceClass')\n, '{SchemaNameStaging}', item().SchemaNameStaging)\n, '{TableNameStaging}', item().TableNameStaging)\n, '{PipelineId}', variables('PipelineValues')[0].PipelineRunId)\n)", "type": "Expression" }, - "queryTimeout": "02:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -440,10 +440,10 @@ "source": { "type": "SqlDWSource", "sqlReaderQuery": { - "value": "@concat('\nDECLARE @CurrentDateTime DATETIME2(0) = GETDATE() \n\nINSERT INTO logging.AutoIngestion VALUES \n(''', variables('MasterPipelineValues')[0].MasterPipelineId, ''' /* MasterPipelineId */\n,''', variables('MasterPipelineValues')[0].MasterPipelineStartDate, ''' /* MasterPipelineStartDate */\n,''', variables('MasterPipelineValues')[0].MasterPipelineStartDateTime, ''' /* MasterPipelineStartDateTime */\n,''', pipeline().parameters.StorageAccountNameMetadata, ''' /* StorageAccountNameMetadata */\n,''', pipeline().RunId, ''' /* PipelineRunId */\n,''', pipeline().Pipeline, ''' /* PipelineName */\n\n,''', item().SchemaNameStaging, ''' /* SchemaNameStaging */\n,''', item().TableNameStaging, ''' /* TableNameStaging */\n,NULL /* FolderPathFull */\n,''', item().SchemaNameTarget, ''' /* SchemaNameTarget */\n,''', item().TableNameTarget, ''' /* TableNameTarget */\n,''', item().TableDistributionTarget, ''' /* TableDistributionTarget */\n,''', item().TableIndexTarget, ''' /* TableIndexTarget */\n\n,''CTAS Final Table'' /* ActivityType */\n,''', replace(CONCAT('/*User Defined Distribution/Index*/ EXECUTE AS user = ''', activity('Get Resource Class Required').output.firstRow.UserName\n\t, ''' IF OBJECT_ID(''', item().SchemaNameTarget, '.', item().TableNameTarget, ''', ''U'') IS NOT NULL DROP TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget\n\t, ']; CREATE TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget, '] WITH (DISTRIBUTION = ', item().TableDistributionTarget, ' , ', item().TableIndexTarget,' ) AS SELECT * FROM [', item().SchemaNameStaging, '].[', item().TableNameStaging, '];SELECT 1 AS a'), '''', ''''''), ''' /* SqlCommand */\n,@CurrentDateTime /* RowInsertDateTime */\n)\n;\nSELECT 1 AS a\n;\n')", + "value": "@concat('\nDECLARE @CurrentDateTime DATETIME2(0) = GETDATE() \n\nINSERT INTO logging.AutoIngestion VALUES \n(''', variables('PipelineValues')[0].PipelineRunId, ''' /* PipelineRunId */\n,''', pipeline().Pipeline, ''' /* PipelineName */\n,''',variables('PipelineValues')[0].PipelineStartDate, ''' /* PipelineStartDate */\n,''', variables('PipelineValues')[0].PipelineStartDateTime, ''' /* PipelineStartDateTime */\n,''', pipeline().parameters.StorageAccountNameMetadata, ''' /* StorageAccountNameMetadata */\n\n,''', item().SchemaNameStaging, ''' /* SchemaNameStaging */\n,''', item().TableNameStaging, ''' /* TableNameStaging */\n,NULL /* FolderPathFull */\n,''', item().SchemaNameTarget, ''' /* SchemaNameTarget */\n,''', item().TableNameTarget, ''' /* TableNameTarget */\n,''', item().TableDistributionTarget, ''' /* TableDistributionTarget */\n,''', item().TableIndexTarget, ''' /* TableIndexTarget */\n\n,''CTAS Final Table'' /* ActivityType */\n,''', replace(CONCAT('/*User Defined Distribution/Index*/ EXECUTE AS user = ''', activity('Get Resource Class Required').output.firstRow.UserName\n\t, ''' IF OBJECT_ID(''', item().SchemaNameTarget, '.', item().TableNameTarget, ''', ''U'') IS NOT NULL DROP TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget\n\t, ']; CREATE TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget, '] WITH (DISTRIBUTION = ', item().TableDistributionTarget, ' , ', item().TableIndexTarget,' ) AS SELECT * FROM [', item().SchemaNameStaging, '].[', item().TableNameStaging, '];'), '''', ''''''), ''' /* SqlCommand */\n,@CurrentDateTime /* RowInsertDateTime */\n)\n;\nSELECT 1 AS a\n;\n')", "type": "Expression" }, - "queryTimeout": "02:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -479,10 +479,10 @@ "source": { "type": "SqlDWSource", "sqlReaderQuery": { - "value": "@CONCAT('/*User Defined Distribution/Index*/ EXECUTE AS user = ''', activity('Get Resource Class Required').output.firstRow.UserName\n, ''' IF OBJECT_ID(''', item().SchemaNameTarget, '.', item().TableNameTarget, ''', ''U'') IS NOT NULL DROP TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget\n, ']; CREATE TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget, '] WITH (DISTRIBUTION = ', item().TableDistributionTarget, ' , ', item().TableIndexTarget,' ) AS SELECT * FROM [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] OPTION (LABEL = ''CTAS - [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] - ', variables('MasterPipelineValues')[0].MasterPipelineId, ''');SELECT 1 AS a')", + "value": "@CONCAT('/*User Defined Distribution/Index*/ EXECUTE AS user = ''', activity('Get Resource Class Required').output.firstRow.UserName\n, ''' IF OBJECT_ID(''', item().SchemaNameTarget, '.', item().TableNameTarget, ''', ''U'') IS NOT NULL DROP TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget\n, ']; CREATE TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget, '] WITH (DISTRIBUTION = ', item().TableDistributionTarget, ' , ', item().TableIndexTarget,' ) AS SELECT * FROM [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] OPTION (LABEL = ''CTAS - [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] - ', variables('PipelineValues')[0].PipelineRunId, ''');SELECT 1 AS a')", "type": "Expression" }, - "queryTimeout": "12:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -521,10 +521,10 @@ "source": { "type": "SqlDWSource", "sqlReaderQuery": { - "value": "@concat(replace(replace(replace(replace(replace(replace(variables('CTAS')\n\t, '{ResourceClassName}', activity('Get Resource Class Required').output.firstRow.UserName)\n\t, '{SchemaNameStaging}', item().SchemaNameStaging)\n\t, '{TableNameStaging}', item().TableNameStaging)\n\t, '{MasterPipelineId}', variables('MasterPipelineValues')[0].MasterPipelineId)\n\t, '{MasterPipelineStartDate}', variables('MasterPipelineValues')[0].MasterPipelineStartDate)\n\t, '{MasterPipelineStartDateTime}', variables('MasterPipelineValues')[0].MasterPipelineStartDateTime)\n)", + "value": "@concat(replace(replace(replace(replace(replace(replace(variables('CTAS')\n\t, '{ResourceClassName}', activity('Get Resource Class Required').output.firstRow.UserName)\n\t, '{SchemaNameStaging}', item().SchemaNameStaging)\n\t, '{TableNameStaging}', item().TableNameStaging)\n\t, '{PipelineRunId}', variables('PipelineValues')[0].PipelineRunId)\n\t, '{PipelineStartDate}', variables('PipelineValues')[0].PipelineStartDate)\n\t, '{PipelineStartDateTime}', variables('PipelineValues')[0].PipelineStartDateTime)\n)", "type": "Expression" }, - "queryTimeout": "02:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -566,10 +566,10 @@ "source": { "type": "SqlDWSource", "sqlReaderQuery": { - "value": "@concat('\nDECLARE @CurrentDateTime DATETIME2(0) = GETDATE() \n\nINSERT INTO logging.AutoIngestion VALUES \n(''', variables('MasterPipelineValues')[0].MasterPipelineId, ''' /* MasterPipelineId */\n,''', variables('MasterPipelineValues')[0].MasterPipelineStartDate, ''' /* MasterPipelineStartDate */\n,''', variables('MasterPipelineValues')[0].MasterPipelineStartDateTime, ''' /* MasterPipelineStartDateTime */\n,''', pipeline().parameters.StorageAccountNameMetadata, ''' /* StorageAccountNameMetadata */\n,''', pipeline().RunId, ''' /* PipelineRunId */\n,''', pipeline().Pipeline, ''' /* PipelineName */\n\n,''', item().SchemaNameStaging, ''' /* SchemaNameStaging */\n,''', item().TableNameStaging, ''' /* TableNameStaging */\n,NULL /* FolderPathFull */\n,''', item().SchemaNameTarget, ''' /* SchemaNameTarget */\n,''', item().TableNameTarget, ''' /* TableNameTarget */\n,''', item().TableDistributionTarget, ''' /* TableDistributionTarget */\n,''', item().TableIndexTarget, ''' /* TableIndexTarget */\n\n,''CTAS Final Table'' /* ActivityType */\n,''', replace(CONCAT('/*Auto Distribution/Index*/ EXECUTE AS user = ''', activity('Get Resource Class Required').output.firstRow.UserName\n\t, ''' IF OBJECT_ID(''', item().SchemaNameTarget, '.', item().TableNameTarget, ''', ''U'') IS NOT NULL DROP TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget\n\t, ']; CREATE TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget, '] ', activity('Profile Staging Table').output.firstRow.DistributionIndex,' AS SELECT * FROM [', item().SchemaNameStaging, '].[', item().TableNameStaging, '];SELECT 1 AS a'), '''', ''''''), ''' /* SqlCommand */\n,@CurrentDateTime /* RowInsertDateTime */\n)\n;\nSELECT 1 AS a\n;\n')", + "value": "@concat('\nDECLARE @CurrentDateTime DATETIME2(0) = GETDATE() \n\nINSERT INTO logging.AutoIngestion VALUES \n(''', variables('PipelineValues')[0].PipelineRunId, ''' /* PipelineRunId */\n,''', pipeline().Pipeline, ''' /* PipelineName */\n,''',variables('PipelineValues')[0].PipelineStartDate, ''' /* PipelineStartDate */\n,''', variables('PipelineValues')[0].PipelineStartDateTime, ''' /* PipelineStartDateTime */\n,''', pipeline().parameters.StorageAccountNameMetadata, ''' /* StorageAccountNameMetadata */\n\n,''', item().SchemaNameStaging, ''' /* SchemaNameStaging */\n,''', item().TableNameStaging, ''' /* TableNameStaging */\n,NULL /* FolderPathFull */\n,''', item().SchemaNameTarget, ''' /* SchemaNameTarget */\n,''', item().TableNameTarget, ''' /* TableNameTarget */\n,''', item().TableDistributionTarget, ''' /* TableDistributionTarget */\n,''', item().TableIndexTarget, ''' /* TableIndexTarget */\n\n,''CTAS Final Table'' /* ActivityType */\n,''', replace(CONCAT('/*Auto Distribution/Index*/ EXECUTE AS user = ''', activity('Get Resource Class Required').output.firstRow.UserName\n\t, ''' IF OBJECT_ID(''', item().SchemaNameTarget, '.', item().TableNameTarget, ''', ''U'') IS NOT NULL DROP TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget\n\t, ']; CREATE TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget, '] ', activity('Profile Staging Table').output.firstRow.DistributionIndex,' AS SELECT * FROM [', item().SchemaNameStaging, '].[', item().TableNameStaging, '];'), '''', ''''''), ''' /* SqlCommand */\n,@CurrentDateTime /* RowInsertDateTime */\n)\n;\nSELECT 1 AS a\n;\n')", "type": "Expression" }, - "queryTimeout": "02:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -612,10 +612,10 @@ "source": { "type": "SqlDWSource", "sqlReaderQuery": { - "value": "@CONCAT('/*Auto Distribution/Index*/ EXECUTE AS user = ''', activity('Get Resource Class Required').output.firstRow.UserName\n, ''' IF OBJECT_ID(''', item().SchemaNameTarget, '.', item().TableNameTarget, ''', ''U'') IS NOT NULL DROP TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget\n, ']; CREATE TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget, '] ', activity('Profile Staging Table').output.firstRow.DistributionIndex,' AS SELECT * FROM [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] OPTION (LABEL = ''CTAS - [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] - ', variables('MasterPipelineValues')[0].MasterPipelineId, ''');SELECT 1 AS a')", + "value": "@CONCAT('/*Auto Distribution/Index*/ EXECUTE AS user = ''', activity('Get Resource Class Required').output.firstRow.UserName\n, ''' IF OBJECT_ID(''', item().SchemaNameTarget, '.', item().TableNameTarget, ''', ''U'') IS NOT NULL DROP TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget\n, ']; CREATE TABLE [', item().SchemaNameTarget, '].[', item().TableNameTarget, '] ', activity('Profile Staging Table').output.firstRow.DistributionIndex,' AS SELECT * FROM [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] OPTION (LABEL = ''CTAS - [', item().SchemaNameStaging, '].[', item().TableNameStaging, '] - ', variables('PipelineValues')[0].PipelineRunId, ''');SELECT 1 AS a')\n", "type": "Expression" }, - "queryTimeout": "12:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -661,10 +661,10 @@ "source": { "type": "SqlDWSource", "sqlReaderQuery": { - "value": "@replace(replace(variables('SqlCommandStats')\n, '{SchemaNameTarget}', item().SchemaNameTarget)\n, '{TableNameTarget}', item().TableNameTarget)\n", + "value": "@concat(replace(replace(variables('SqlCommandStats')\n, '{SchemaNameTarget}', item().SchemaNameTarget)\n, '{TableNameTarget}', item().TableNameTarget)\n, '; SELECT 1 AS a'\n)", "type": "Expression" }, - "queryTimeout": "12:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -706,10 +706,10 @@ "source": { "type": "SqlDWSource", "sqlReaderQuery": { - "value": "@concat('\nDECLARE @CurrentDateTime DATETIME2(0) = GETDATE() \n\nINSERT INTO logging.AutoIngestion VALUES \n(''', variables('MasterPipelineValues')[0].MasterPipelineId, ''' /* MasterPipelineId */\n,''', variables('MasterPipelineValues')[0].MasterPipelineStartDate, ''' /* MasterPipelineStartDate */\n,''', variables('MasterPipelineValues')[0].MasterPipelineStartDateTime, ''' /* MasterPipelineStartDateTime */\n,''', pipeline().parameters.StorageAccountNameMetadata, ''' /* StorageAccountNameMetadata */\n,''', pipeline().RunId, ''' /* PipelineId */\n,''', pipeline().Pipeline, ''' /* PipelineName */\n\n,NULL /* SchemaNameStaging */\n,NULL /* TableNameStaging */\n,NULL /* FolderPathFull */\n,''', item().SchemaNameTarget, ''' /* SchemaNameTarget */\n,''', item().TableNameTarget, ''' /* TableNameTarget */\n,NULL /* TableDistributionTarget */\n,NULL /* TableIndexTarget */\n\n,''Stats'' /* ActivityType */\n,''', replace(replace(replace(variables('SqlCommandStats'), '{SchemaNameTarget}', item().SchemaNameTarget)\n\t, '{TableNameTarget}', item().TableNameTarget), '''', ''''''), ''' /* SqlCommand */\n,@CurrentDateTime /* RowInsertDateTime */\n)\n;\nSELECT 1 AS a\n;\n')", + "value": "@concat('\nDECLARE @CurrentDateTime DATETIME2(0) = GETDATE() \n\nINSERT INTO logging.AutoIngestion VALUES \n(''', variables('PipelineValues')[0].PipelineRunId, ''' /* PipelineRunId */\n,''', pipeline().Pipeline, ''' /* PipelineName */\n,''',variables('PipelineValues')[0].PipelineStartDate, ''' /* PipelineStartDate */\n,''', variables('PipelineValues')[0].PipelineStartDateTime, ''' /* PipelineStartDateTime */\n,''', pipeline().parameters.StorageAccountNameMetadata, ''' /* StorageAccountNameMetadata */\n\n,NULL /* SchemaNameStaging */\n,NULL /* TableNameStaging */\n,NULL /* FolderPathFull */\n,''', item().SchemaNameTarget, ''' /* SchemaNameTarget */\n,''', item().TableNameTarget, ''' /* TableNameTarget */\n,NULL /* TableDistributionTarget */\n,NULL /* TableIndexTarget */\n\n,''Stats'' /* ActivityType */\n,''', replace(replace(replace(variables('SqlCommandStats'), '{SchemaNameTarget}', item().SchemaNameTarget)\n\t, '{TableNameTarget}', item().TableNameTarget), '''', ''''''), ''' /* SqlCommand */\n,@CurrentDateTime /* RowInsertDateTime */\n)\n;\nSELECT 1 AS a\n;\n')", "type": "Expression" }, - "queryTimeout": "02:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -755,7 +755,7 @@ "value": "@concat('\nEXECUTE AS user = ''Userstaticrc10''\n\nDECLARE @sql NVARCHAR(MAX)\n\nSELECT\t\t@sql = CASE WHEN tp.[distribution_policy_desc] = ''REPLICATE'' THEN CONCAT(''SELECT COUNT_BIG(*) FROM ['', s.name, ''].['', t.name, '']'') ELSE NULL END\nFROM\t\tsys.schemas s\nJOIN\t\tsys.tables t\nON\t\t\ts.[schema_id] = t.[schema_id]\nJOIN\t\tsys.indexes i\nON\t\t\tt.[object_id] = i.[object_id]\nAND\t\t\ti.[index_id] <= 1\nJOIN\t\tsys.pdw_table_distribution_properties tp\nON\t\t\tt.[object_id] = tp.[object_id]\nWHERE\t\ts.name = ''', item().SchemaNameTarget, '''\nAND\t\t\tt.name = ''', item().TableNameTarget, '''\n;\n\nIF @sql IS NOT NULL\n\tEXEC (@sql)\n\nSELECT 1 AS a\n')", "type": "Expression" }, - "queryTimeout": "02:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -797,7 +797,7 @@ "value": "@concat('\nSELECT *\nFROM\n(\n\tSELECT DISTINCT 1 AS SchemaFlag, ''staging'' AS SchemaNameStaging, NULL AS TableNameStaging, NULL AS FolderPathFull, NULL AS SchemaNameTarget, NULL AS TableNameTarget, NULL AS TableDistributionTarget, NULL AS TableIndexTarget\n\tFROM OPENROWSET\n\t(\n\t\tBULK ''', pipeline().parameters.StorageAccountNameMetadata, '''\n\t\t,FORMAT = ''CSV''\n\t\t,PARSER_VERSION=''2.0''\n\t\t,HEADER_ROW = TRUE\n\t) AS r\n\tUNION\n\tSELECT DISTINCT 1 AS SchemaFlag, SchemaNameTarget, NULL AS TableNameStaging, NULL AS FolderPathFull, NULL AS SchemaNameTarget, NULL AS TableNameTarget, NULL AS TableDistributionTarget, NULL AS TableIndexTarget\n\tFROM OPENROWSET\n\t(\n\t\tBULK ''', pipeline().parameters.StorageAccountNameMetadata, '''\n\t\t,FORMAT = ''CSV''\n\t\t,PARSER_VERSION=''2.0''\n\t\t,HEADER_ROW = TRUE\n\t) AS r\n) AS a\nUNION\nSELECT 0 AS SchemaFlag, ''staging'' AS SchemaNameTarget, TableNameTarget AS TableNameStaging, FolderPathFull, SchemaNameTarget, TableNameTarget, TableDistributionTarget, TableIndexTarget\nFROM OPENROWSET\n(\n BULK ''', pipeline().parameters.StorageAccountNameMetadata, '''\n ,FORMAT = ''CSV''\n ,PARSER_VERSION=''2.0''\n ,HEADER_ROW = TRUE\n) AS r\n')", "type": "Expression" }, - "queryTimeout": "02:00:00", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -887,8 +887,8 @@ "typeProperties": { "source": { "type": "SqlDWSource", - "sqlReaderQuery": "IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE [name] = 'logging')\n EXEC ('CREATE SCHEMA [logging]')\n;\n\nIF OBJECT_ID('logging.AutoIngestion', 'U') IS NULL\nCREATE TABLE logging.AutoIngestion \n(\n\tId INT IDENTITY(1,1) NOT NULL\n\t,MasterPipelineId VARCHAR(50) NOT NULL\n\t,MasterPipelineStartDate INT NOT NULL\n\t,MasterPipelineStartDateTime DATETIME2(0) NOT NULL\n\t--,MasterPiplineParameters VARCHAR(1000) NOT NULL\n\t,StorageAccountNameMetadata NVARCHAR(1000) NULL\n\t-- ,StorageAccountKeyMetadata NVARCHAR(100) NULL\n\t-- ,StorageAccountKeyData NVARCHAR(100) NULL\n\t,PipelineRunId VARCHAR(50) NOT NULL\n\t,PipelineName VARCHAR(100) NOT NULL\n\n\t,SchemaNameStaging NVARCHAR(100) NULL\n ,TableNameStaging NVARCHAR(100) NULL\n\t,FolderPathFull NVARCHAR(1000) NULL\n\t,SchemaNameTarget NVARCHAR(100) NULL\n ,TableNameTarget NVARCHAR(100) NULL\n ,TableDistributionTarget NVARCHAR(100) NULL\n\t,TableIndexTarget NVARCHAR(100) NULL\n\n\t,ActivityType VARCHAR(50) NOT NULL --ex 'Create Staging Table', 'Copy Into', 'Build Statistics'\n\t,SqlCommand NVARCHAR(MAX) NOT NULL\n ,RowInsertDateTime DATETIME2(0) NOT NULL\n)\nWITH (DISTRIBUTION = ROUND_ROBIN, CLUSTERED INDEX(MasterPipelineStartDate)\n)\n;\n\nIF OBJECT_ID('logging.DataProfile', 'U') IS NULL\nCREATE TABLE logging.DataProfile\n(\n\tId INT IDENTITY(1,1) NOT NULL\n\t,MasterPipelineId NVARCHAR(50) NOT NULL\n\t,MasterPipelineStartDate INT NOT NULL\n\t,MasterPipelineStartDateTime DATETIME2(0) NOT NULL\n ,SchemaName NVARCHAR(100) NOT NULL\n ,TableName NVARCHAR(100) NOT NULL\n\t,ColumnName NVARCHAR(100) NOT NULL\n\t,DataTypeName NVARCHAR(100) NOT NULL\n\t,DataTypeFull NVARCHAR(100) NOT NULL\n\t,CharacterLength INT NULL\n\t,PrecisionValue INT NULL\t\n\t,ScaleValue INT NULL\t\n\t,UniqueValueCount BIGINT NOT NULL\n\t,NullCount BIGINT NOT NULL\n\t,MinValue NVARCHAR(MAX)\n\t,MaxValue NVARCHAR(MAX)\n\t,MinLength INT\n\t,MaxLength INT\n\t,DataAverage NUMERIC(30,2)\n\t,DataStdevp FLOAT\n\t,TableRowCount BIGINT NOT NULL\n\t,TableDataSpaceGB NUMERIC(20,2) NOT NULL\n\t,SqlCommand NVARCHAR(MAX) NOT NULL\n ,RowInsertDateTime DATETIME2(0) NOT NULL\n)\nWITH (DISTRIBUTION = ROUND_ROBIN, CLUSTERED INDEX(TableName)\n)\n;\nSELECT 1 AS a", - "queryTimeout": "02:00:00", + "sqlReaderQuery": "IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE [name] = 'logging')\n EXEC ('CREATE SCHEMA [logging]')\n;\n\nIF OBJECT_ID('logging.AutoIngestion', 'U') IS NULL\nCREATE TABLE logging.AutoIngestion \n(\n\tId INT IDENTITY(1,1) NOT NULL\n\t,PipelineRunId VARCHAR(50) NOT NULL\n\t,PipelineName VARCHAR(100) NOT NULL\n\t,PipelineStartDate INT NOT NULL\n\t,PipelineStartDateTime DATETIME2(0) NOT NULL\n\t,StorageAccountNameMetadata NVARCHAR(1000) NULL\n\t,SchemaNameStaging NVARCHAR(100) NULL\n ,TableNameStaging NVARCHAR(100) NULL\n\t,FolderPathFull NVARCHAR(1000) NULL\n\t,SchemaNameTarget NVARCHAR(100) NULL\n ,TableNameTarget NVARCHAR(100) NULL\n ,TableDistributionTarget NVARCHAR(100) NULL\n\t,TableIndexTarget NVARCHAR(100) NULL\n\t,ActivityType VARCHAR(50) NOT NULL --ex 'Create Staging Table', 'Copy Into', 'Build Statistics'\n\t,SqlCommand NVARCHAR(MAX) NOT NULL\n ,RowInsertDateTime DATETIME2(0) NOT NULL\n)\nWITH (DISTRIBUTION = ROUND_ROBIN, CLUSTERED INDEX(PipelineStartDateTime, PipelineRunId)\n)\n;\n\nIF OBJECT_ID('logging.DataProfile', 'U') IS NULL\nCREATE TABLE logging.DataProfile\n(\n\tId INT IDENTITY(1,1) NOT NULL\n\t,PipelineRunId NVARCHAR(50) NOT NULL\n\t,PipelineStartDate INT NOT NULL\n\t,PipelineStartDateTime DATETIME2(0) NOT NULL\n ,SchemaName NVARCHAR(100) NOT NULL\n ,TableName NVARCHAR(100) NOT NULL\n\t,ColumnName NVARCHAR(100) NOT NULL\n\t,DataTypeName NVARCHAR(100) NOT NULL\n\t,DataTypeFull NVARCHAR(100) NOT NULL\n\t,CharacterLength INT NULL\n\t,PrecisionValue INT NULL\t\n\t,ScaleValue INT NULL\t\n\t,UniqueValueCount BIGINT NOT NULL\n\t,NullCount BIGINT NOT NULL\n\t,MinValue NVARCHAR(MAX)\n\t,MaxValue NVARCHAR(MAX)\n\t,MinLength INT\n\t,MaxLength INT\n\t,DataAverage NUMERIC(30,2)\n\t,DataStdevp FLOAT\n\t,TableRowCount BIGINT NOT NULL\n\t,TableDataSpaceGB NUMERIC(20,2) NOT NULL\n\t,WeightedScore NUMERIC(30,4)\n\t,SqlCommand NVARCHAR(MAX) NOT NULL\n ,RowInsertDateTime DATETIME2(0) NOT NULL\n)\nWITH (DISTRIBUTION = ROUND_ROBIN, CLUSTERED INDEX(PipelineStartDateTime, PipelineRunId)\n)\n;\nSELECT 1 AS a", + "queryTimeout": "24:00:00", "partitionOption": "None" }, "dataset": { @@ -912,20 +912,20 @@ "parameters": { "StorageAccountNameMetadata": { "type": "string", - "defaultValue": "https://REPLACE_DATALAKE_NAME.dfs.core.windows.net/data/Parquet_Auto_Ingestion_Metadata.csv" + "defaultValue": "https://REPLACE_DATALAKE_NAME.dfs.core.windows.net/curated/metadata-table-AdventureWorksDW-Trimmed-Minimal.csv" } }, "variables": { - "MasterPipelineValues": { + "PipelineValues": { "type": "Array" }, "SqlCommandCreateStagingTable": { "type": "String", - "defaultValue": "IF OBJECT_ID('tempdb..#tables') IS NOT NULL \tDROP TABLE #tables; CREATE TABLE #tables ( \tSchemaName NVARCHAR(100) \t,TableName NVARCHAR(100) \t,FolderPath NVARCHAR(1000) \t); INSERT INTO #tables VALUES ( \t'{SchemaNameStaging}' \t,'{TableNameStaging}' \t,'{FolderPathFull}' \t) IF OBJECT_ID('tempdb..#CreateViewsDDL') IS NOT NULL \tDROP TABLE #CreateViewsDDL; CREATE TABLE #CreateViewsDDL ( \tSchemaName NVARCHAR(100) \t,ViewName NVARCHAR(100) \t,ViewDDL NVARCHAR(MAX) \t); DECLARE @cnt INT = 1 DECLARE @sqlCreateView NVARCHAR(MAX) DECLARE @SchemaName NVARCHAR(100) DECLARE @TableName NVARCHAR(100) DECLARE @FolderPath NVARCHAR(1000) SELECT @SchemaName = SchemaName \t,@TableName = TableName \t,@FolderPath = FolderPath \t,@sqlCreateView = CONCAT ( \t\t'sp_describe_first_result_set @tsql=N''SELECT * FROM OPENROWSET(BULK ''''' \t\t,FolderPath \t\t,''''' , FORMAT=''''PARQUET'''') AS r''' \t\t) FROM #tables; IF OBJECT_ID('tempdb..#InformationSchemaTempTable', 'U') IS NOT NULL \tDROP TABLE #InformationSchemaTempTable; CREATE TABLE #InformationSchemaTempTable ( \tis_hidden BIT NOT NULL \t,column_ordinal INT NOT NULL \t,name SYSNAME NULL \t,is_nullable BIT NOT NULL \t,system_type_id INT NOT NULL \t,system_type_name NVARCHAR(256) NULL \t,max_length SMALLINT NOT NULL \t,precision TINYINT NOT NULL \t,scale TINYINT NOT NULL \t,collation_name SYSNAME NULL \t,user_type_id INT NULL \t,user_type_database SYSNAME NULL \t,user_type_schema SYSNAME NULL \t,user_type_name SYSNAME NULL \t,assembly_qualified_type_name NVARCHAR(4000) \t,xml_collection_id INT NULL \t,xml_collection_database SYSNAME NULL \t,xml_collection_schema SYSNAME NULL \t,xml_collection_name SYSNAME NULL \t,is_xml_document BIT NOT NULL \t,is_case_sensitive BIT NOT NULL \t,is_fixed_length_clr_type BIT NOT NULL \t,source_server SYSNAME NULL \t,source_database SYSNAME NULL \t,source_schema SYSNAME NULL \t,source_table SYSNAME NULL \t,source_column SYSNAME NULL \t,is_identity_column BIT NULL \t,is_part_of_unique_key BIT NULL \t,is_updateable BIT NULL \t,is_computed_column BIT NULL \t,is_sparse_column_set BIT NULL \t,ordinal_in_order_by_list SMALLINT NULL \t,order_by_list_length SMALLINT NULL \t,order_by_is_descending SMALLINT NULL \t,tds_type_id INT NOT NULL \t,tds_length INT NOT NULL \t,tds_collation_id INT NULL \t,tds_collation_sort_id TINYINT NULL \t); INSERT INTO #InformationSchemaTempTable EXEC (@sqlCreateView) /*SELECT * FROM #InformationSchemaTempTable*/ DECLARE @GetMaxValueStatement NVARCHAR(MAX) DECLARE @GetColumnList NVARCHAR(MAX) SELECT @GetMaxValueStatement = CONVERT(NVARCHAR(MAX), CONCAT ( \t\t\t'SELECT ' \t\t\t,STRING_AGG(ColumnMaxLength, ',') \t\t\t,' FROM OPENROWSET(BULK ''' \t\t\t,@FolderPath \t\t\t,''' , FORMAT=''PARQUET'') WITH (' \t\t\t,STRING_AGG(CONVERT(NVARCHAR(MAX), ColumnDatatypeWithMax), ',') \t\t\t,') AS r' \t\t\t)) \t,@GetColumnList = STRING_AGG(QUOTENAME([name]), ',') FROM ( \tSELECT CASE \t\t\tWHEN system_type_name LIKE ('%char%') \t\t\t\tOR system_type_name = 'varbinary(8000)' \t\t\t\tTHEN CONCAT ( \t\t\t\t\t\t'CONVERT(BIGINT, COALESCE(NULLIF(MAX(DATALENGTH(' \t\t\t\t\t\t,QUOTENAME([name]) \t\t\t\t\t\t,')), 0), 1)) AS ' \t\t\t\t\t\t,QUOTENAME([name]) \t\t\t\t\t\t) \t\t\tELSE CONCAT ( \t\t\t\t\t'COALESCE(CONVERT(BIGINT, SUM(0)), 0) AS ' \t\t\t\t\t,QUOTENAME([name]) \t\t\t\t\t) \t\t\tEND AS ColumnMaxLength \t\t,CASE \t\t\tWHEN system_type_name LIKE ('%char%') \t\t\t\tTHEN CONCAT ( \t\t\t\t\t\tQUOTENAME([name]) \t\t\t\t\t\t,' ' \t\t\t\t\t\t,REPLACE(system_type_name, '8000', 'MAX') \t\t\t\t\t\t,' COLLATE Latin1_General_100_BIN2_UTF8' \t\t\t\t\t\t) \t\t\tWHEN system_type_name = 'varbinary(8000)' \t\t\t\tTHEN CONCAT ( \t\t\t\t\t\tQUOTENAME([name]) \t\t\t\t\t\t,' ' \t\t\t\t\t\t,REPLACE(system_type_name, '8000', 'MAX') \t\t\t\t\t\t) \t\t\tELSE CONCAT ( \t\t\t\t\tQUOTENAME([name]) \t\t\t\t\t,' ' \t\t\t\t\t,system_type_name \t\t\t\t\t) \t\t\tEND AS ColumnDatatypeWithMax \t\t,[name] \tFROM #InformationSchemaTempTable \t) AS a /*SELECT @GetMaxValueStatement*/ /*SELECT @GetColumnList*/ DECLARE @sqlUnpivot NVARCHAR(MAX) SET @sqlUnpivot = CONCAT ( \t\t'SELECT ''' \t\t,@TableName \t\t,''' AS TABLE_NAME, unpvt.col AS COLUMN_NAME, CASE WHEN unpvt.datatype > 8000 THEN ''MAX'' ELSE CONVERT(NVARCHAR(100), unpvt.datatype) END AS DATATYPE_MAX FROM ( ' \t\t,@GetMaxValueStatement \t\t,' ) AS a ' \t\t,CHAR(13) \t\t,' UNPIVOT ( datatype FOR col IN ( ' \t\t,@GetColumnList \t\t,') ) AS unpvt' \t\t) DROP TABLE IF EXISTS #tmpBus; \tCREATE TABLE #tmpBus ( \t\tTABLE_CLEAN NVARCHAR(1000) \t\t,COLUMN_NAME NVARCHAR(1000) \t\t,DATATYPE_MAX NVARCHAR(1000) \t\t); INSERT INTO #tmpBus EXEC (@sqlUnpivot) DECLARE @createFinalView NVARCHAR(MAX) DECLARE @openrowsetValue NVARCHAR(MAX) SELECT @createFinalView = CONCAT ( \t\t'CREATE TABLE [' \t\t,@SchemaName \t\t,'].[' \t\t,@TableName \t\t,'] (' \t\t,STRING_AGG(ColumnFullDefinition, ',') \t\t,') WITH ( DISTRIBUTION = ROUND_ROBIN, HEAP)' \t\t) \t,@openrowsetValue = CONCAT ( \t\t'FROM OPENROWSET(BULK ''''' \t\t,@FolderPath \t\t,''''', FORMAT=''''PARQUET'''') WITH (' \t\t,STRING_AGG(CONVERT(NVARCHAR(MAX), ColumnFullDefinition), ',') \t\t) FROM ( \tSELECT @TableName AS table_name \t\t,c.[name] \t\t,UPPER(TYPE_NAME(c.system_type_id)) AS DataType \t\t,CONCAT ( \t\t\tQUOTENAME(c.[name]) \t\t\t,' ' \t\t\t,CASE \t\t\t\tWHEN TYPE_NAME(c.system_type_id) IN ( \t\t\t\t\t\t'int' \t\t\t\t\t\t,'bigint' \t\t\t\t\t\t,'smallint' \t\t\t\t\t\t,'tinyint' \t\t\t\t\t\t,'bit' \t\t\t\t\t\t,'decimal' \t\t\t\t\t\t,'numeric' \t\t\t\t\t\t,'float' \t\t\t\t\t\t,'real' \t\t\t\t\t\t,'datetime2' \t\t\t\t\t\t,'date' \t\t\t\t\t\t) \t\t\t\t\tTHEN UPPER(c.system_type_name) \t\t\t\tELSE CONCAT ( \t\t\t\t\t\tUPPER(TYPE_NAME(c.system_type_id)) \t\t\t\t\t\t,'(' \t\t\t\t\t\t,a.DATATYPE_MAX \t\t\t\t\t\t,') ' \t\t\t\t\t\t) \t\t\t\tEND \t\t\t) AS ColumnFullDefinition \tFROM #InformationSchemaTempTable AS c \tJOIN #tmpBus AS a ON a.COLUMN_NAME = c.[name] \tORDER BY column_ordinal OFFSET 0 ROWS \t) AS a /*SELECT @createFinalView*/ /*INSERT INTO #CreateViewsDDL*/ SELECT @SchemaName AS SchemaName \t,@TableName AS TableName \t,CONCAT ( \t\t'IF OBJECT_ID(''' \t\t,@SchemaName \t\t,'.' \t\t,@TableName \t\t,''', ''U'') IS NOT NULL DROP TABLE [' \t\t,@SchemaName \t\t,'].[' \t\t,@TableName \t\t,']; ' \t\t,@createFinalView \t\t,'; SELECT 1' \t\t) AS CreateTableDDL;" + "defaultValue": "IF OBJECT_ID('tempdb..#tables') IS NOT NULL \tDROP TABLE #tables; CREATE TABLE #tables ( \tSchemaName NVARCHAR(100) \t,TableName NVARCHAR(100) \t,FolderPath NVARCHAR(1000) \t); INSERT INTO #tables VALUES ( \t'{SchemaNameStaging}' \t,'{TableNameStaging}' \t,'{FolderPathFull}' \t) IF OBJECT_ID('tempdb..#CreateViewsDDL') IS NOT NULL \tDROP TABLE #CreateViewsDDL; CREATE TABLE #CreateViewsDDL ( \tSchemaName NVARCHAR(100) \t,ViewName NVARCHAR(100) \t,ViewDDL NVARCHAR(MAX) \t); DECLARE @cnt INT = 1 DECLARE @sqlCreateView NVARCHAR(MAX) DECLARE @SchemaName NVARCHAR(100) DECLARE @TableName NVARCHAR(100) DECLARE @FolderPath NVARCHAR(1000) SELECT @SchemaName = SchemaName \t,@TableName = TableName \t,@FolderPath = FolderPath \t,@sqlCreateView = CONCAT ( \t\t'sp_describe_first_result_set @tsql=N''SELECT * FROM OPENROWSET(BULK ''''' \t\t,FolderPath \t\t,''''' , FORMAT=''''PARQUET'''') AS r''' \t\t) FROM #tables; IF OBJECT_ID('tempdb..#InformationSchemaTempTable', 'U') IS NOT NULL \tDROP TABLE #InformationSchemaTempTable; CREATE TABLE #InformationSchemaTempTable ( \tis_hidden BIT NOT NULL \t,column_ordinal INT NOT NULL \t,name SYSNAME NULL \t,is_nullable BIT NOT NULL \t,system_type_id INT NOT NULL \t,system_type_name NVARCHAR(256) NULL \t,max_length SMALLINT NOT NULL \t,precision TINYINT NOT NULL \t,scale TINYINT NOT NULL \t,collation_name SYSNAME NULL \t,user_type_id INT NULL \t,user_type_database SYSNAME NULL \t,user_type_schema SYSNAME NULL \t,user_type_name SYSNAME NULL \t,assembly_qualified_type_name NVARCHAR(4000) \t,xml_collection_id INT NULL \t,xml_collection_database SYSNAME NULL \t,xml_collection_schema SYSNAME NULL \t,xml_collection_name SYSNAME NULL \t,is_xml_document BIT NOT NULL \t,is_case_sensitive BIT NOT NULL \t,is_fixed_length_clr_type BIT NOT NULL \t,source_server SYSNAME NULL \t,source_database SYSNAME NULL \t,source_schema SYSNAME NULL \t,source_table SYSNAME NULL \t,source_column SYSNAME NULL \t,is_identity_column BIT NULL \t,is_part_of_unique_key BIT NULL \t,is_updateable BIT NULL \t,is_computed_column BIT NULL \t,is_sparse_column_set BIT NULL \t,ordinal_in_order_by_list SMALLINT NULL \t,order_by_list_length SMALLINT NULL \t,order_by_is_descending SMALLINT NULL \t,tds_type_id INT NOT NULL \t,tds_length INT NOT NULL \t,tds_collation_id INT NULL \t,tds_collation_sort_id TINYINT NULL \t); INSERT INTO #InformationSchemaTempTable EXEC (@sqlCreateView) /*SELECT * FROM #InformationSchemaTempTable*/ DECLARE @GetMaxValueStatement NVARCHAR(MAX) DECLARE @GetColumnList NVARCHAR(MAX) SELECT @GetMaxValueStatement = CONVERT(NVARCHAR(MAX), CONCAT ( \t\t\t'SELECT ' \t\t\t,STRING_AGG(ColumnMaxLength, ',') \t\t\t,' FROM OPENROWSET(BULK ''' \t\t\t,@FolderPath \t\t\t,''' , FORMAT=''PARQUET'') WITH (' \t\t\t,STRING_AGG(CONVERT(NVARCHAR(MAX), ColumnDatatypeWithMax), ',') \t\t\t,') AS r' \t\t\t)) \t,@GetColumnList = STRING_AGG(QUOTENAME([name]), ',') FROM ( \tSELECT CASE \t\t\tWHEN system_type_name LIKE ('%char%') \t\t\t\tOR system_type_name = 'varbinary(8000)' \t\t\t\tTHEN CONCAT ( \t\t\t\t\t\t'CONVERT(BIGINT, COALESCE(NULLIF(MAX(DATALENGTH(' \t\t\t\t\t\t,QUOTENAME([name]) \t\t\t\t\t\t,')), 0), 1)) AS ' \t\t\t\t\t\t,QUOTENAME([name]) \t\t\t\t\t\t) \t\t\tELSE CONCAT ( \t\t\t\t\t'COALESCE(CONVERT(BIGINT, SUM(0)), 0) AS ' \t\t\t\t\t,QUOTENAME([name]) \t\t\t\t\t) \t\t\tEND AS ColumnMaxLength \t\t,CASE \t\t\tWHEN system_type_name LIKE ('%char%') \t\t\t\tTHEN CONCAT ( \t\t\t\t\t\tQUOTENAME([name]) \t\t\t\t\t\t,' ' \t\t\t\t\t\t,REPLACE(system_type_name, '8000', 'MAX') \t\t\t\t\t\t,' COLLATE Latin1_General_100_BIN2_UTF8' \t\t\t\t\t\t) \t\t\tWHEN system_type_name = 'varbinary(8000)' \t\t\t\tTHEN CONCAT ( \t\t\t\t\t\tQUOTENAME([name]) \t\t\t\t\t\t,' ' \t\t\t\t\t\t,REPLACE(system_type_name, '8000', 'MAX') \t\t\t\t\t\t) \t\t\tELSE CONCAT ( \t\t\t\t\tQUOTENAME([name]) \t\t\t\t\t,' ' \t\t\t\t\t,system_type_name \t\t\t\t\t) \t\t\tEND AS ColumnDatatypeWithMax \t\t,[name] \tFROM #InformationSchemaTempTable \t) AS a /*SELECT @GetMaxValueStatement*/ /*SELECT @GetColumnList*/ DECLARE @sqlUnpivot NVARCHAR(MAX) SET @sqlUnpivot = CONCAT ( \t\t'SELECT ''' \t\t,@TableName \t\t,''' AS TABLE_NAME, unpvt.col AS COLUMN_NAME, CASE WHEN unpvt.datatype > 8000 THEN ''MAX'' ELSE CONVERT(NVARCHAR(100), unpvt.datatype) END AS DATATYPE_MAX FROM ( ' \t\t,@GetMaxValueStatement \t\t,' ) AS a ' \t\t,CHAR(13) \t\t,' UNPIVOT ( datatype FOR col IN ( ' \t\t,@GetColumnList \t\t,') ) AS unpvt' \t\t) DROP TABLE IF EXISTS #tmpBus; \tCREATE TABLE #tmpBus ( \t\tTABLE_CLEAN NVARCHAR(1000) \t\t,COLUMN_NAME NVARCHAR(1000) \t\t,DATATYPE_MAX NVARCHAR(1000) \t\t); INSERT INTO #tmpBus EXEC (@sqlUnpivot) DECLARE @createFinalView NVARCHAR(MAX) DECLARE @openrowsetValue NVARCHAR(MAX) SELECT @createFinalView = CONCAT ( \t\t'CREATE TABLE ' \t\t,QUOTENAME(@SchemaName) \t\t,'.' \t\t,QUOTENAME(@TableName) \t\t,' (' \t\t,STRING_AGG(ColumnFullDefinition, ',') \t\t,') WITH ( DISTRIBUTION = ROUND_ROBIN, HEAP)' \t\t) \t,@openrowsetValue = CONCAT ( \t\t'FROM OPENROWSET(BULK ''''' \t\t,@FolderPath \t\t,''''', FORMAT=''''PARQUET'''') WITH (' \t\t,STRING_AGG(CONVERT(NVARCHAR(MAX), ColumnFullDefinition), ',') \t\t) FROM ( \tSELECT @TableName AS table_name \t\t,c.[name] \t\t,UPPER(TYPE_NAME(c.system_type_id)) AS DataType \t\t,CONCAT ( \t\t\tQUOTENAME(c.[name]) \t\t\t,' ' \t\t\t,CASE \t\t\t\tWHEN TYPE_NAME(c.system_type_id) IN ( \t\t\t\t\t\t'int' \t\t\t\t\t\t,'bigint' \t\t\t\t\t\t,'smallint' \t\t\t\t\t\t,'tinyint' \t\t\t\t\t\t,'bit' \t\t\t\t\t\t,'decimal' \t\t\t\t\t\t,'numeric' \t\t\t\t\t\t,'float' \t\t\t\t\t\t,'real' \t\t\t\t\t\t,'datetime2' \t\t\t\t\t\t,'date' \t\t\t\t\t\t) \t\t\t\t\tTHEN UPPER(c.system_type_name) \t\t\t\tELSE CONCAT ( \t\t\t\t\t\tUPPER(TYPE_NAME(c.system_type_id)) \t\t\t\t\t\t,'(' \t\t\t\t\t\t,a.DATATYPE_MAX \t\t\t\t\t\t,') ' \t\t\t\t\t\t) \t\t\t\tEND \t\t\t) AS ColumnFullDefinition \tFROM #InformationSchemaTempTable AS c \tJOIN #tmpBus AS a ON a.COLUMN_NAME = c.[name] \tORDER BY column_ordinal OFFSET 0 ROWS \t) AS a /*SELECT @createFinalView*/ /*INSERT INTO #CreateViewsDDL*/ SELECT @SchemaName AS SchemaName \t,@TableName AS TableName \t,CONCAT ( \t\t'IF OBJECT_ID(''' \t\t,QUOTENAME(@SchemaName) \t\t,'.' \t\t,QUOTENAME(@TableName) \t\t,''', ''U'') IS NOT NULL DROP TABLE ' \t\t,QUOTENAME(@SchemaName) \t\t,'.' \t\t,QUOTENAME(@TableName) \t\t,'; ' \t\t,@createFinalView \t\t,';' \t\t) AS CreateTableDDL;" }, "SqlCommandStats": { "type": "String", - "defaultValue": "EXECUTE AS user = 'Userstaticrc10' DECLARE @sql NVARCHAR(MAX) SELECT @sql = STRING_AGG(CONVERT(NVARCHAR(MAX), CONCAT ( \t\t\t\tDropStatDDL \t\t\t\t,create_stat_ddl \t\t\t\t)), ';') FROM ( \tSELECT DISTINCT t.[name] AS [table_name] \t\t,s.[name] AS [table_schema_name] \t\t,c.[name] AS [column_name] \t\t,c.[column_id] AS [column_id] \t\t,t.[object_id] AS [object_id] \t\t,CASE \t\t\tWHEN l.[object_id] IS NULL \t\t\t\tTHEN 0 \t\t\tELSE 1 \t\t\tEND AS StatExistsFlag \t\t,st.name AS StatName \t\t,CAST('CREATE STATISTICS ' + QUOTENAME('stat_' + s.[name] + '_' + t.[name] + '_' + c.[name]) + ' ON ' + QUOTENAME(s.[name]) + '.' + QUOTENAME(t.[name]) + '(' + QUOTENAME(c.[name]) + ') WITH FULLSCAN' AS VARCHAR(8000)) AS create_stat_ddl \t\t,CASE \t\t\tWHEN st.name IS NULL \t\t\t\tTHEN '' \t\t\tELSE CONCAT ( \t\t\t\t\t'DROP STATISTICS ' \t\t\t\t\t,QUOTENAME(s.[name]) \t\t\t\t\t,'.' \t\t\t\t\t,QUOTENAME(t.[name]) \t\t\t\t\t,'.' \t\t\t\t\t,QUOTENAME(st.name) \t\t\t\t\t,';' \t\t\t\t\t) \t\t\tEND AS DropStatDDL \tFROM sys.[tables] AS t \tJOIN sys.[schemas] AS s ON t.[schema_id] = s.[schema_id] \tJOIN sys.[columns] AS c ON t.[object_id] = c.[object_id] \tJOIN sys.types AS tp ON c.user_type_id = tp.user_type_id \tLEFT JOIN sys.[stats_columns] l ON l.[object_id] = c.[object_id] \t\tAND l.[column_id] = c.[column_id] \t\tAND l.[stats_column_id] = 1 \tLEFT JOIN sys.stats AS st ON st.[object_id] = t.[object_id] \t\tAND st.stats_id = l.stats_id \tLEFT JOIN sys.indexes AS i ON i.[object_id] = t.[object_id] \t\tAND i.index_id = st.stats_id \tLEFT JOIN sys.[external_tables] AS e ON e.[object_id] = t.[object_id] \tWHERE e.[object_id] IS NULL /* not an external table */ \t\tAND i.[object_id] IS NULL \t\tAND c.max_length < 100 \t\tAND ( \t\t\t( \t\t\t\ttp.[name] IN ( \t\t\t\t\t'decimal' \t\t\t\t\t,'numeric' \t\t\t\t\t) \t\t\t\tAND c.scale = 0 \t\t\t\t) \t\t\tOR tp.[name] NOT IN ( \t\t\t\t'decimal' \t\t\t\t,'numeric' \t\t\t\t,'float' \t\t\t\t) \t\t\t) \t\tAND s.[name] = '{SchemaNameTarget}' \t\tAND t.[name] = '{TableNameTarget}' \t) AS a EXEC (@sql); SELECT 1 AS a" + "defaultValue": "EXECUTE AS user = 'Userstaticrc10' DECLARE @sql NVARCHAR(MAX) SELECT @sql = STRING_AGG(CONVERT(NVARCHAR(MAX), CONCAT ( DropStatDDL , create_stat_ddl )), ';') FROM ( SELECT DISTINCT t.[name] AS [table_name] , s.[name] AS [table_schema_name] , c.[name] AS [column_name] , c.[column_id] AS [column_id] , t.[object_id] AS [object_id] , CASE WHEN l.[object_id] IS NULL THEN 0 ELSE 1 END AS StatExistsFlag , st.name AS StatName , CAST('CREATE STATISTICS ' + QUOTENAME('stat_' + s.[name] + '_' + t.[name] + '_' + c.[name]) + ' ON ' + QUOTENAME(s.[name]) + '.' + QUOTENAME(t.[name]) + '(' + QUOTENAME(c.[name]) + ') WITH FULLSCAN' AS VARCHAR(8000)) AS create_stat_ddl , CASE WHEN st.name IS NULL THEN '' ELSE CONCAT ( 'DROP STATISTICS ' , QUOTENAME(s.[name]) , '.' , QUOTENAME(t.[name]) , '.' , QUOTENAME(st.name) , ';' ) END AS DropStatDDL FROM sys.[tables] AS t JOIN sys.[schemas] AS s ON t.[schema_id] = s.[schema_id] JOIN sys.[columns] AS c ON t.[object_id] = c.[object_id] JOIN sys.types AS tp ON c.user_type_id = tp.user_type_id LEFT JOIN sys.[stats_columns] l ON l.[object_id] = c.[object_id] AND l.[column_id] = c.[column_id] AND l.[stats_column_id] = 1 LEFT JOIN sys.stats AS st ON st.[object_id] = t.[object_id] AND st.stats_id = l.stats_id LEFT JOIN sys.indexes AS i ON i.[object_id] = t.[object_id] AND i.index_id = st.stats_id LEFT JOIN sys.[external_tables] AS e ON e.[object_id] = t.[object_id] WHERE e.[object_id] IS NULL /* not an external table */ AND i.[object_id] IS NULL AND c.max_length < 100 AND ( ( tp.[name] IN ('decimal', 'numeric') AND c.scale = 0 ) OR tp.[name] NOT IN ('decimal', 'numeric', 'float') ) AND s.[name] = '{SchemaNameTarget}' AND t.[name] = '{TableNameTarget}' ) AS a EXEC (@sql);" }, "GetResourceClass": { "type": "String", @@ -933,11 +933,11 @@ }, "CTAS": { "type": "String", - "defaultValue": "EXECUTE AS user = '{ResourceClassName}' DECLARE @SchemaNameStaging NVARCHAR(MAX) = '{SchemaNameStaging}' \t,@TableNameStaging NVARCHAR(MAX) = '{TableNameStaging}' \t,@RowCount BIGINT \t,@TableDataSpaceGB DECIMAL(36, 2); WITH base AS ( \tSELECT s.name AS [schema_name] \t\t,t.name AS [table_name] \t\t,QUOTENAME(s.name) + '.' + QUOTENAME(t.name) AS [two_part_name] \t\t,tp.[distribution_policy_desc] AS [distribution_policy_name] \t\t,c.[name] AS [distribution_column] \t\t,i.[type_desc] AS [index_type_desc] \t\t,[reserved_page_count] AS reserved_space_page_count \t\t,[used_data] AS data_space_page_count \t\t,([reserved_page_count] - ([used_data] + ([reserved_page_count] - [used_page_count]))) AS index_space_page_count \t\t,([reserved_page_count] - [used_page_count]) AS unused_space_page_count \t\t,nps.[row_count] AS [row_count] \tFROM sys.schemas s \tJOIN sys.tables t ON s.[schema_id] = t.[schema_id] \tJOIN sys.indexes i ON t.[object_id] = i.[object_id] \t\tAND i.[index_id] <= 1 \tJOIN sys.pdw_table_distribution_properties tp ON t.[object_id] = tp.[object_id] \tJOIN sys.pdw_table_mappings tm ON t.[object_id] = tm.[object_id] \tJOIN sys.pdw_nodes_tables nt ON tm.[physical_name] = nt.[name] \tJOIN sys.dm_pdw_nodes pn ON nt.[pdw_node_id] = pn.[pdw_node_id] \tJOIN sys.pdw_distributions di ON nt.[distribution_id] = di.[distribution_id] \tJOIN ( \t\tSELECT [object_id] \t\t\t,[pdw_node_id] \t\t\t,[distribution_id] \t\t\t,SUM(row_count) AS row_count \t\t\t,SUM([reserved_page_count]) AS [reserved_page_count] \t\t\t,SUM([used_data]) AS [used_data] \t\t\t,SUM([used_page_count]) AS [used_page_count] \t\tFROM ( \t\t\tSELECT [object_id] \t\t\t\t,[pdw_node_id] \t\t\t\t,[distribution_id] \t\t\t\t,MAX(row_count) AS row_count \t\t\t\t,SUM([reserved_page_count]) AS [reserved_page_count] \t\t\t\t,MAX([used_page_count]) AS [used_data] \t\t\t\t,SUM([used_page_count]) AS [used_page_count] \t\t\tFROM sys.dm_pdw_nodes_db_partition_stats \t\t\tGROUP BY [object_id] \t\t\t\t,[pdw_node_id] \t\t\t\t,[distribution_id] \t\t\t) AS a \t\tGROUP BY [object_id] \t\t\t,[pdw_node_id] \t\t\t,[distribution_id] \t\t) AS nps ON nt.[object_id] = nps.[object_id] \t\tAND nt.[pdw_node_id] = nps.[pdw_node_id] \t\tAND nt.[distribution_id] = nps.[distribution_id] \tLEFT JOIN ( \t\tSELECT * \t\tFROM sys.pdw_column_distribution_properties \t\tWHERE distribution_ordinal = 1 \t\t) cdp ON t.[object_id] = cdp.[object_id] \tLEFT JOIN sys.columns c ON cdp.[object_id] = c.[object_id] \t\tAND cdp.[column_id] = c.[column_id] \tWHERE pn.[type] = 'COMPUTE' \t\tAND s.name = @SchemaNameStaging \t\tAND t.name = @TableNameStaging \t) \t,size AS ( \tSELECT [schema_name] \t\t,[table_name] \t\t,[distribution_policy_name] \t\t,[distribution_column] \t\t,[index_type_desc] \t\t,[index_space_page_count] \t\t,([reserved_space_page_count] * 8.0) / 1000000 AS [reserved_space_GB] \t\t,([unused_space_page_count] * 8.0) / 1000000 AS [unused_space_GB] \t\t,([data_space_page_count] * 8.0) / 1000000 AS [data_space_GB] \t\t,([index_space_page_count] * 8.0) / 1000000 AS [index_space_GB] \t\t,[row_count] AS row_count \tFROM base \t) SELECT @RowCount = SUM(row_count) /*AS [RowCount]*/ \t,@TableDataSpaceGB = SUM(data_space_GB) /*AS TableDataSpaceGB*/ FROM size GROUP BY schema_name \t,table_name \t,distribution_policy_name \t,distribution_column \t,index_type_desc ORDER BY schema_name \t,table_name \t,SUM(reserved_space_GB) DESC;/*SELECT @RowCount, @TableDataSpaceGB*/ DECLARE @sql NVARCHAR(MAX) \t,@Distribution NVARCHAR(MAX) \t,@IndexType NVARCHAR(MAX) SELECT @sql = sqlString FROM ( \tSELECT CONCAT ( \t\t\t'IF OBJECT_ID(''tempdb..#temp'') IS NOT NULL DROP TABLE #temp; CREATE TABLE #temp WITH (DISTRIBUTION = ROUND_ROBIN, HEAP) AS \t\t\t\t\tSELECT \t\t\t\t\t\t\t* \t\t\t\t\t\t\t,CASE WHEN DataTypeName = ''int'' THEN 1 \t\t\t\t\t\t\t\tWHEN DataTypeName IN (''numeric'', ''decimal'') AND ScaleValue = 0 THEN 1 \t\t\t\t\t\t\t\tWHEN DataTypeName = ''bigint'' THEN 2 \t\t\t\t\t\t\t\tWHEN DataTypeName = ''smallint'' THEN 2 \t\t\t\t\t\t\t\tWHEN DataTypeName IN (''char'', ''varchar'', ''nvarchar'') \t\t\t\t\t\t\t\t\tTHEN \t\t\t\t\t\t\t\t\t\tCASE WHEN CharacterLength <= 5 THEN 3 \t\t\t\t\t\t\t\t\t\t\t\tWHEN CharacterLength <= 10 THEN 4 \t\t\t\t\t\t\t\t\t\t\t\tWHEN CharacterLength <= 20 THEN 6 \t\t\t\t\t\t\t\t\t\t\t\tWHEN CharacterLength <= 50 THEN 8 \t\t\t\t\t\t\t\t\t\t\t\tELSE 10 \t\t\t\t\t\t\t\t\t\t\tEND \t\t\t\t\t\t\t\tELSE 10 \t\t\t\t\t\t\t\tEND \t\t\t\t\t\t\t+ \t\t\t\t\t\t\tCASE WHEN NullCountPercentage < 0.01 THEN 1 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.02 THEN 2 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.03 THEN 3 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.04 THEN 4 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.05 THEN 5 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.06 THEN 6 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.07 THEN 7 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.08 THEN 8 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.09 THEN 9 \t\t\t\t\t\t\t\tELSE 10 \t\t\t\t\t\t\t\tEND \t\t\t\t\t\t\t+ \t\t\t\t\t\t\tCASE WHEN UniqueValueCount > 60 AND UniqueValueCountPercentage > 0.000001 THEN 1 ELSE 2\t\t\t\t\t\t\tEND + RANK() OVER (ORDER BY NullCountPercentage ASC, CASE WHEN TableRowCount > 60000000 AND UniqueValueCount >= 600 THEN -1*UniqueValueCountPercentage ELSE UniqueValueCountPercentage END DESC, UniqueValueCount DESC, ABS(LEN(''' \t\t\t,@TableNameStaging \t\t\t,''') - LEN(ColName)))/10.0 \t\t\t\t\t\t\tAS WeightedScore \t\t\t\t\t\tFROM \t\t\t\t\t\t( \t\t\t\t\t\t\tSELECT\t* \t\t\t\t\t\t\t\t\t,NullCount/(NULLIF(TableRowCount, 0)*1.0) AS NullCountPercentage \t\t\t\t\t\t\t\t\t,UniqueValueCount/(NULLIF(TableRowCount, 0)*1.0) AS UniqueValueCountPercentage \t\t\t\t\t\t\tFROM (' \t\t\t,STRING_AGG(CONVERT(NVARCHAR(MAX), CONCAT ( \t\t\t\t\t\t'SELECT ''' \t\t\t\t\t\t,COLUMN_NAME \t\t\t\t\t\t,''' AS ColName, ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' AS DataTypeName \t\t\t\t\t\t\t\t,CASE \t\t\t\t\t\t\t\t\t WHEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' IN (''varchar'', ''char'') THEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' + ''('' + CASE WHEN ' \t\t\t\t\t\t,COALESCE(CAST(CHARACTER_MAXIMUM_LENGTH AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' = -1 THEN ''max'' ELSE CAST(' \t\t\t\t\t\t,COALESCE(CAST(CHARACTER_MAXIMUM_LENGTH AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS VARCHAR(25)) END + '')'' \t\t\t\t\t\t\t\t\t WHEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' IN (''nvarchar'',''nchar'') THEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' + ''('' + CASE WHEN ' \t\t\t\t\t\t,COALESCE(CAST(CHARACTER_MAXIMUM_LENGTH AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' = -1 THEN ''max'' ELSE CAST(' \t\t\t\t\t\t,COALESCE(CAST(CHARACTER_MAXIMUM_LENGTH AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' / 2 AS VARCHAR(25)) END + '')'' \t\t\t\t\t\t\t\t\t WHEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' IN (''decimal'', ''numeric'') THEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' + ''('' + CAST(' \t\t\t\t\t\t,COALESCE(CAST(NUMERIC_PRECISION AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS VARCHAR(25)) + '', '' + CAST(' \t\t\t\t\t\t,COALESCE(CAST(NUMERIC_SCALE AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS VARCHAR(25)) + '')'' \t\t\t\t\t\t\t\t\t WHEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' IN (''datetime2'') THEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' + ''('' + CAST(' \t\t\t\t\t\t,COALESCE(CAST(NUMERIC_SCALE AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS VARCHAR(25)) + '')'' \t\t\t\t\t\t\t\t\t ELSE ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' \t\t\t\t\t\t\t\t\tEND AS DataTypeFull \t\t\t\t\t\t\t\t,' \t\t\t\t\t\t,COALESCE(CAST(CHARACTER_MAXIMUM_LENGTH AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS CharacterLength \t\t\t\t\t\t\t\t,' \t\t\t\t\t\t,COALESCE(CAST(NUMERIC_PRECISION AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS PrecisionValue, ' \t\t\t\t\t\t,COALESCE(CAST(NUMERIC_SCALE AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS ScaleValue \t\t\t\t\t\t\t\t,COUNT_BIG(DISTINCT ' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,') AS UniqueValueCount, COALESCE(SUM(CONVERT(BIGINT, CASE WHEN ' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,' IS NULL THEN 1 ELSE 0 END)), CONVERT(BIGINT, 0)) AS NullCount \t\t\t\t\t\t \t\t\t\t\t\t,COALESCE(CONVERT(NVARCHAR(MAX), MIN(' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,')), '''') AS [MinValue] \t\t\t\t\t\t,COALESCE(CONVERT(NVARCHAR(MAX), MAX(' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,')), '''') AS [MaxValue] \t\t\t\t\t\t,COALESCE(MIN(DATALENGTH(' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,')), 0) AS [MinLength] \t\t\t\t\t\t,COALESCE(MAX(DATALENGTH(' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,')), 0) AS [MaxLength] \t\t\t\t\t\t \t\t\t\t\t\t \t\t\t\t\t\t,AVG(CASE WHEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' IN (''char'', ''varchar'', ''nvarchar'') THEN CONVERT(NUMERIC(30,2), NULL) ELSE ' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,' END) AS DataAverage \t\t\t\t\t\t \t\t\t\t\t\t,STDEVP(CASE WHEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' IN (''char'', ''varchar'', ''nvarchar'') THEN CONVERT(FLOAT, NULL) ELSE ' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,' END) AS DataStdevp\t\t\t\t\t\t,' \t\t\t\t\t\t,@RowCount \t\t\t\t\t\t,' AS TableRowCount,' \t\t\t\t\t\t,@TableDataSpaceGB \t\t\t\t\t\t,' AS TableDataSpaceGB \t\t\t\t\t\t\t\tFROM ' \t\t\t\t\t\t,QUOTENAME(TABLE_SCHEMA) \t\t\t\t\t\t,'.' \t\t\t\t\t\t,QUOTENAME(TABLE_NAME) \t\t\t\t\t\t,'' \t\t\t\t\t\t)), ' UNION ') \t\t\t,') AS a) AS a' \t\t\t,' OPTION (LABEL = ''' \t\t\t,CONCAT ( \t\t\t\t'Data Profile - ' \t\t\t\t,QUOTENAME(@SchemaNameStaging) \t\t\t\t,'.' \t\t\t\t,QUOTENAME(@TableNameStaging) \t\t\t\t,' - {MasterPipelineId}' \t\t\t\t) \t\t\t,''')' \t\t\t) AS sqlString \tFROM INFORMATION_SCHEMA.COLUMNS \tWHERE TABLE_SCHEMA = @SchemaNameStaging \t\tAND TABLE_NAME = @TableNameStaging \t\tAND ( \t\t\tDATA_TYPE IN ( \t\t\t\t'int' \t\t\t\t,'bigint' \t\t\t\t,'char' \t\t\t\t,'varchar' \t\t\t\t,'nvarchar' \t\t\t\t) \t\t\tOR NUMERIC_SCALE = 0 \t\t\t) /*AND COLUMN_NAME NOT LIKE '%date%' \t\tAND COLUMN_NAME NOT LIKE '%time%'*/ \t) AS a;/*SELECT @sql*/ EXEC (@sql); INSERT INTO logging.DataProfile SELECT '{MasterPipelineId}' AS MasterPipelineId \t,'{MasterPipelineStartDate}' AS MasterPipelineStartDate \t,'{MasterPipelineStartDateTime}' AS MasterPipelineStartDateTime \t,@SchemaNameStaging AS SchemaName \t,@TableNameStaging AS TableName \t,ColName \t,DataTypeName \t,DataTypeFull \t,CharacterLength \t,PrecisionValue \t,ScaleValue \t,UniqueValueCount \t,NullCount \t,MinValue \t,MaxValue \t,MinLength \t,MaxLength \t,DataAverage \t,DataStdevp \t,TableRowCount \t,TableDataSpaceGB \t,CONCAT ( \t\t@sql \t\t,'; SELECT * FROM #temp ORDER BY WeightedScore' \t\t) AS SqlCommand \t,GETDATE() AS RowInsertDateTime FROM #temp; IF @RowCount >= 60000000 BEGIN /*Run this if row count is greater than 60M and data size on disk is less that 2GB*/ \tSELECT @sql = 'SELECT TOP 1 @Distribution = CONCAT('' WITH ( DISTRIBUTION = HASH('', ColName, ''), CLUSTERED COLUMNSTORE INDEX ) '') FROM #temp WHERE ColName NOT LIKE ''%date%'' AND ColName NOT LIKE ''%time%'' ORDER BY WeightedScore' \tFROM #temp \tWHERE ( \t\t\tColName NOT LIKE '%time%' \t\t\tAND ColName NOT LIKE '%date%' \t\t\t) /*Will need to be tweaked*/ \tEXEC sp_executesql @Query = @sql \t\t,@Params = N'@Distribution NVARCHAR(MAX) OUTPUT' \t\t,@Distribution = @Distribution OUTPUT END ELSE IF @RowCount < 60000000 \tAND @TableDataSpaceGB > 2 BEGIN \tSELECT @sql = 'SELECT TOP 1 @Distribution = CONCAT('' WITH ( DISTRIBUTION = HASH('', ColName, ''), CLUSTERED INDEX('', ColName, '') ) '') FROM #temp ORDER BY WeightedScore' \tFROM #temp \tEXEC sp_executesql @Query = @sql \t\t,@Params = N'@Distribution NVARCHAR(MAX) OUTPUT' \t\t,@Distribution = @Distribution OUTPUT END ELSE BEGIN \tSELECT @sql = 'SELECT TOP 1 @Distribution = CONCAT('' WITH ( DISTRIBUTION = REPLICATE, CLUSTERED INDEX('', ColName, '') ) '') FROM #temp ORDER BY WeightedScore' \tFROM #temp \tEXEC sp_executesql @Query = @sql \t\t,@Params = N'@Distribution NVARCHAR(MAX) OUTPUT' \t\t,@Distribution = @Distribution OUTPUT END SELECT @Distribution AS DistributionIndex;" + "defaultValue": "EXECUTE AS USER = '{ResourceClassName}' DECLARE @SchemaNameStaging NVARCHAR(MAX) = '{SchemaNameStaging}' \t,@TableNameStaging NVARCHAR(MAX) = '{TableNameStaging}' \t,@RowCount BIGINT \t,@TableDataSpaceGB DECIMAL(36, 2); WITH base AS ( \tSELECT s.name AS [schema_name] \t\t,t.name AS [table_name] \t\t,QUOTENAME(s.name) + '.' + QUOTENAME(t.name) AS [two_part_name] \t\t,tp.[distribution_policy_desc] AS [distribution_policy_name] \t\t,c.[name] AS [distribution_column] \t\t,i.[type_desc] AS [index_type_desc] \t\t,[reserved_page_count] AS reserved_space_page_count \t\t,[used_data] AS data_space_page_count \t\t,([reserved_page_count] - ([used_data] + ([reserved_page_count] - [used_page_count]))) AS index_space_page_count \t\t,([reserved_page_count] - [used_page_count]) AS unused_space_page_count \t\t,nps.[row_count] AS [row_count] \tFROM sys.schemas s \tJOIN sys.tables t ON s.[schema_id] = t.[schema_id] \tJOIN sys.indexes i ON t.[object_id] = i.[object_id] \t\tAND i.[index_id] <= 1 \tJOIN sys.pdw_table_distribution_properties tp ON t.[object_id] = tp.[object_id] \tJOIN sys.pdw_table_mappings tm ON t.[object_id] = tm.[object_id] \tJOIN sys.pdw_nodes_tables nt ON tm.[physical_name] = nt.[name] \tJOIN sys.dm_pdw_nodes pn ON nt.[pdw_node_id] = pn.[pdw_node_id] \tJOIN sys.pdw_distributions di ON nt.[distribution_id] = di.[distribution_id] \tJOIN ( \t\tSELECT [object_id] \t\t\t,[pdw_node_id] \t\t\t,[distribution_id] \t\t\t,SUM(row_count) AS row_count \t\t\t,SUM([reserved_page_count]) AS [reserved_page_count] \t\t\t,SUM([used_data]) AS [used_data] \t\t\t,SUM([used_page_count]) AS [used_page_count] \t\tFROM ( \t\t\tSELECT [object_id] \t\t\t\t,[pdw_node_id] \t\t\t\t,[distribution_id] \t\t\t\t,MAX(row_count) AS row_count \t\t\t\t,SUM([reserved_page_count]) AS [reserved_page_count] \t\t\t\t,MAX([used_page_count]) AS [used_data] \t\t\t\t,SUM([used_page_count]) AS [used_page_count] \t\t\tFROM sys.dm_pdw_nodes_db_partition_stats \t\t\tGROUP BY [object_id] \t\t\t\t,[pdw_node_id] \t\t\t\t,[distribution_id] \t\t\t) AS a \t\tGROUP BY [object_id] \t\t\t,[pdw_node_id] \t\t\t,[distribution_id] \t\t) AS nps ON nt.[object_id] = nps.[object_id] \t\tAND nt.[pdw_node_id] = nps.[pdw_node_id] \t\tAND nt.[distribution_id] = nps.[distribution_id] \tLEFT JOIN ( \t\tSELECT * \t\tFROM sys.pdw_column_distribution_properties \t\tWHERE distribution_ordinal = 1 \t\t) cdp ON t.[object_id] = cdp.[object_id] \tLEFT JOIN sys.columns c ON cdp.[object_id] = c.[object_id] \t\tAND cdp.[column_id] = c.[column_id] \tWHERE pn.[type] = 'COMPUTE' \t\tAND s.name = @SchemaNameStaging \t\tAND t.name = @TableNameStaging \t) \t,size AS ( \tSELECT [schema_name] \t\t,[table_name] \t\t,[distribution_policy_name] \t\t,[distribution_column] \t\t,[index_type_desc] \t\t,[index_space_page_count] \t\t,([reserved_space_page_count] * 8.0) / 1000000 AS [reserved_space_GB] \t\t,([unused_space_page_count] * 8.0) / 1000000 AS [unused_space_GB] \t\t,([data_space_page_count] * 8.0) / 1000000 AS [data_space_GB] \t\t,([index_space_page_count] * 8.0) / 1000000 AS [index_space_GB] \t\t,[row_count] AS row_count \tFROM base \t) SELECT @RowCount = SUM(row_count) /*AS [RowCount]*/ \t,@TableDataSpaceGB = SUM(data_space_GB) /*AS TableDataSpaceGB*/ FROM size GROUP BY schema_name \t,table_name \t,distribution_policy_name \t,distribution_column \t,index_type_desc ORDER BY schema_name \t,table_name \t,SUM(reserved_space_GB) DESC;/*SELECT @RowCount, @TableDataSpaceGB*/ DECLARE @sql NVARCHAR(MAX) \t,@Distribution NVARCHAR(MAX) \t,@IndexType NVARCHAR(MAX) SELECT @sql = sqlString FROM ( \tSELECT CONCAT ( \t\t\t'IF OBJECT_ID(''tempdb..#temp'') IS NOT NULL DROP TABLE #temp; CREATE TABLE #temp WITH (DISTRIBUTION = ROUND_ROBIN, HEAP) AS \t\t\t\t\tSELECT \t\t\t\t\t\t\t* \t\t\t\t\t\t\t,CASE WHEN DataTypeName = ''int'' THEN 1 \t\t\t\t\t\t\t\tWHEN DataTypeName IN (''numeric'', ''decimal'') AND ScaleValue = 0 THEN 1 \t\t\t\t\t\t\t\tWHEN DataTypeName = ''bigint'' THEN 2 \t\t\t\t\t\t\t\tWHEN DataTypeName = ''smallint'' THEN 2 \t\t\t\t\t\t\t\tWHEN DataTypeName IN (''char'', ''varchar'', ''nvarchar'') \t\t\t\t\t\t\t\t\tTHEN \t\t\t\t\t\t\t\t\t\tCASE WHEN CharacterLength <= 5 THEN 3 \t\t\t\t\t\t\t\t\t\t\t\tWHEN CharacterLength <= 10 THEN 4 \t\t\t\t\t\t\t\t\t\t\t\tWHEN CharacterLength <= 20 THEN 6 \t\t\t\t\t\t\t\t\t\t\t\tWHEN CharacterLength <= 50 THEN 8 \t\t\t\t\t\t\t\t\t\t\t\tELSE 10 \t\t\t\t\t\t\t\t\t\t\tEND \t\t\t\t\t\t\t\tELSE 10 \t\t\t\t\t\t\t\tEND \t\t\t\t\t\t\t+ \t\t\t\t\t\t\tCASE WHEN NullCountPercentage < 0.01 THEN 1 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.02 THEN 2 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.03 THEN 3 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.04 THEN 4 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.05 THEN 5 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.06 THEN 6 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.07 THEN 7 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.08 THEN 8 \t\t\t\t\t\t\t\tWHEN NullCountPercentage < 0.09 THEN 9 \t\t\t\t\t\t\t\tELSE 10 \t\t\t\t\t\t\t\tEND \t\t\t\t\t\t\t+ \t\t\t\t\t\t\tCASE WHEN UniqueValueCount > 60 AND UniqueValueCountPercentage > 0.000001 THEN 1 ELSE 3\t\t\t\t\t\t\tEND + RANK() OVER (ORDER BY NullCountPercentage ASC, CASE WHEN TableRowCount > 60000000 AND UniqueValueCount >= 600 THEN -1*UniqueValueCountPercentage ELSE UniqueValueCountPercentage END DESC, UniqueValueCount DESC, ABS(LEN(''' \t\t\t,@TableNameStaging \t\t\t,''') - LEN(ColName)))/10.0 \t\t\t\t\t\t\tAS WeightedScore \t\t\t\t\t\tFROM \t\t\t\t\t\t( \t\t\t\t\t\t\tSELECT\t* \t\t\t\t\t\t\t\t\t,NullCount/(NULLIF(TableRowCount, 0)*1.0) AS NullCountPercentage \t\t\t\t\t\t\t\t\t,UniqueValueCount/(NULLIF(TableRowCount, 0)*1.0) AS UniqueValueCountPercentage \t\t\t\t\t\t\tFROM (' \t\t\t,STRING_AGG(CONVERT(NVARCHAR(MAX), CONCAT ( \t\t\t\t\t\t'SELECT ''' \t\t\t\t\t\t,COLUMN_NAME \t\t\t\t\t\t,''' AS ColName, ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' AS DataTypeName \t\t\t\t\t\t\t\t,CASE \t\t\t\t\t\t\t\t\t WHEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' IN (''varchar'', ''char'') THEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' + ''('' + CASE WHEN ' \t\t\t\t\t\t,COALESCE(CAST(CHARACTER_MAXIMUM_LENGTH AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' = -1 THEN ''max'' ELSE CAST(' \t\t\t\t\t\t,COALESCE(CAST(CHARACTER_MAXIMUM_LENGTH AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS VARCHAR(25)) END + '')'' \t\t\t\t\t\t\t\t\t WHEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' IN (''nvarchar'',''nchar'') THEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' + ''('' + CASE WHEN ' \t\t\t\t\t\t,COALESCE(CAST(CHARACTER_MAXIMUM_LENGTH AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' = -1 THEN ''max'' ELSE CAST(' \t\t\t\t\t\t,COALESCE(CAST(CHARACTER_MAXIMUM_LENGTH AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' / 2 AS VARCHAR(25)) END + '')'' \t\t\t\t\t\t\t\t\t WHEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' IN (''decimal'', ''numeric'') THEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' + ''('' + CAST(' \t\t\t\t\t\t,COALESCE(CAST(NUMERIC_PRECISION AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS VARCHAR(25)) + '', '' + CAST(' \t\t\t\t\t\t,COALESCE(CAST(NUMERIC_SCALE AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS VARCHAR(25)) + '')'' \t\t\t\t\t\t\t\t\t WHEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' IN (''datetime2'') THEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' + ''('' + CAST(' \t\t\t\t\t\t,COALESCE(CAST(NUMERIC_SCALE AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS VARCHAR(25)) + '')'' \t\t\t\t\t\t\t\t\t ELSE ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' \t\t\t\t\t\t\t\t\tEND AS DataTypeFull \t\t\t\t\t\t\t\t,' \t\t\t\t\t\t,COALESCE(CAST(CHARACTER_MAXIMUM_LENGTH AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS CharacterLength \t\t\t\t\t\t\t\t,' \t\t\t\t\t\t,COALESCE(CAST(NUMERIC_PRECISION AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS PrecisionValue, ' \t\t\t\t\t\t,COALESCE(CAST(NUMERIC_SCALE AS VARCHAR(10)), 'NULL') \t\t\t\t\t\t,' AS ScaleValue \t\t\t\t\t\t\t\t,COUNT_BIG(DISTINCT ' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,') AS UniqueValueCount, COALESCE(SUM(CONVERT(BIGINT, CASE WHEN ' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,' IS NULL THEN 1 ELSE 0 END)), CONVERT(BIGINT, 0)) AS NullCount \t\t\t\t\t\t \t\t\t\t\t\t,COALESCE(CONVERT(NVARCHAR(MAX), MIN(' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,')), '''') AS [MinValue] \t\t\t\t\t\t,COALESCE(CONVERT(NVARCHAR(MAX), MAX(' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,')), '''') AS [MaxValue] \t\t\t\t\t\t,COALESCE(MIN(DATALENGTH(' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,')), 0) AS [MinLength] \t\t\t\t\t\t,COALESCE(MAX(DATALENGTH(' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,')), 0) AS [MaxLength] \t\t\t\t\t\t \t\t\t\t\t\t \t\t\t\t\t\t,AVG(CASE WHEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' IN (''char'', ''varchar'', ''nvarchar'') THEN CONVERT(NUMERIC(30,2), NULL) ELSE ' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,' END) AS DataAverage \t\t\t\t\t\t \t\t\t\t\t\t,STDEVP(CASE WHEN ''' \t\t\t\t\t\t,DATA_TYPE \t\t\t\t\t\t,''' IN (''char'', ''varchar'', ''nvarchar'') THEN CONVERT(FLOAT, NULL) ELSE ' \t\t\t\t\t\t,QUOTENAME(COLUMN_NAME) \t\t\t\t\t\t,' END) AS DataStdevp\t\t\t\t\t\t,' \t\t\t\t\t\t,@RowCount \t\t\t\t\t\t,' AS TableRowCount,' \t\t\t\t\t\t,@TableDataSpaceGB \t\t\t\t\t\t,' AS TableDataSpaceGB \t\t\t\t\t\t\t\tFROM ' \t\t\t\t\t\t,QUOTENAME(TABLE_SCHEMA) \t\t\t\t\t\t,'.' \t\t\t\t\t\t,QUOTENAME(TABLE_NAME) \t\t\t\t\t\t,'' \t\t\t\t\t\t)), ' UNION ') \t\t\t,') AS a) AS a' \t\t\t,' OPTION (LABEL = ''' \t\t\t,CONCAT ( \t\t\t\t'Data Profile - ' \t\t\t\t,QUOTENAME(@SchemaNameStaging) \t\t\t\t,'.' \t\t\t\t,QUOTENAME(@TableNameStaging) \t\t\t\t,' - {PipelineRunId}' \t\t\t\t) \t\t\t,''')' \t\t\t) AS sqlString \tFROM INFORMATION_SCHEMA.COLUMNS \tWHERE TABLE_SCHEMA = @SchemaNameStaging \t\tAND TABLE_NAME = @TableNameStaging \t\tAND ( \t\t\tDATA_TYPE IN ( \t\t\t\t'int' \t\t\t\t,'bigint' \t\t\t\t,'char' \t\t\t\t,'varchar' \t\t\t\t,'nvarchar' \t\t\t\t) \t\t\tOR NUMERIC_SCALE = 0 \t\t\t) /*AND COLUMN_NAME NOT LIKE '%date%' \t\tAND COLUMN_NAME NOT LIKE '%time%'*/ \t) AS a;/*SELECT @sql*/ EXEC (@sql); INSERT INTO logging.DataProfile SELECT '{PipelineRunId}' AS PipelineRunId \t,'{PipelineStartDate}' AS PipelineStartDate \t,'{PipelineStartDateTime}' AS PipelineStartDateTime \t,@SchemaNameStaging AS SchemaName \t,@TableNameStaging AS TableName \t,ColName \t,DataTypeName \t,DataTypeFull \t,CharacterLength \t,PrecisionValue \t,ScaleValue \t,UniqueValueCount \t,NullCount \t,MinValue \t,MaxValue \t,MinLength \t,MaxLength \t,DataAverage \t,DataStdevp \t,TableRowCount \t,TableDataSpaceGB \t,WeightedScore \t,CONCAT ( \t\t@sql \t\t,'; SELECT * FROM #temp ORDER BY WeightedScore' \t\t) AS SqlCommand \t,GETDATE() AS RowInsertDateTime FROM #temp; IF @RowCount >= 60000000 BEGIN /*Run this if row count is greater than 60M and data size on disk is less that 2GB*/ \tSELECT @sql = 'SELECT TOP 1 @Distribution = CONCAT('' WITH ( DISTRIBUTION = HASH('', ColName, ''), CLUSTERED COLUMNSTORE INDEX ) '') FROM #temp WHERE ColName NOT LIKE ''%date%'' AND ColName NOT LIKE ''%time%'' ORDER BY WeightedScore' \tFROM #temp \tWHERE ( \t\t\tColName NOT LIKE '%time%' \t\t\tAND ColName NOT LIKE '%date%' \t\t\t) /*Will need to be tweaked*/ \tEXEC sp_executesql @Query = @sql \t\t,@Params = N'@Distribution NVARCHAR(MAX) OUTPUT' \t\t,@Distribution = @Distribution OUTPUT END ELSE IF @RowCount < 60000000 \tAND @TableDataSpaceGB > 2 BEGIN \tSELECT @sql = 'SELECT TOP 1 @Distribution = CONCAT('' WITH ( DISTRIBUTION = HASH('', ColName, ''), CLUSTERED INDEX('', ColName, '') ) '') FROM #temp ORDER BY WeightedScore' \tFROM #temp \tEXEC sp_executesql @Query = @sql \t\t,@Params = N'@Distribution NVARCHAR(MAX) OUTPUT' \t\t,@Distribution = @Distribution OUTPUT END ELSE BEGIN \tSELECT @sql = 'SELECT TOP 1 @Distribution = CONCAT('' WITH ( DISTRIBUTION = REPLICATE, CLUSTERED INDEX('', ColName, '') ) '') FROM #temp ORDER BY WeightedScore' \tFROM #temp \tEXEC sp_executesql @Query = @sql \t\t,@Params = N'@Distribution NVARCHAR(MAX) OUTPUT' \t\t,@Distribution = @Distribution OUTPUT END SELECT @Distribution AS DistributionIndex;" }, "DatabaseName": { "type": "String", - "defaultValue": "REPLACE_SYNAPSE_ANALYTICS_SQL_POOL_NAME" + "defaultValue": "DataWarehouse" } }, "annotations": [] diff --git a/deploySynapse.sh b/deploySynapse.sh index 1dbeb6b..98104c7 100644 --- a/deploySynapse.sh +++ b/deploySynapse.sh @@ -203,9 +203,17 @@ azcopy cp 'artifacts/Parquet_Auto_Ingestion_Metadata.csv' 'https://'"${datalakeN sampleDataStorageSAS="?sv=2020-10-02&st=2021-11-23T05%3A00%3A00Z&se=2022-11-24T05%3A00%3A00Z&sr=c&sp=rl&sig=PMi22pEYzw1dHNrOI9gqrwcbi3AJLq%2BxWoSX9SOTLuw%3D" azcopy cp 'https://synapseanalyticspocdata.blob.core.windows.net/sample/AdventureWorks/'"${sampleDataStorageSAS}" 'https://'"${datalakeName}"'.blob.core.windows.net/data/Sample?'"${destinationStorageSAS}" --recursive >> deploySynapse.log 2>&1 -# Create the Auto_Pause_and_Resume Pipeline in the Synapse Analytics Workspace +# Create the Auto Ingestion Pipeline in the Synapse Analytics Workspace az synapse pipeline create --only-show-errors -o none --workspace-name ${synapseAnalyticsWorkspaceName} --name "Parquet Auto Ingestion" --file @artifacts/Parquet_Auto_Ingestion.json >> deploySynapse.log 2>&1 +# Copy the Lake Database Auto DDL Pipeline template and update the variables +cp artifacts/Lake_Database_Auto_DDL.json.tmpl artifacts/Lake_Database_Auto_DDL.json 2>&1 +sed -i "s/REPLACE_DATALAKE_NAME/${datalakeName}/g" artifacts/Lake_Database_Auto_DDL.json + +# Create the Lake Database Auto DDL Pipeline in the Synapse Analytics Workspace +az synapse pipeline create --only-show-errors -o none --workspace-name ${synapseAnalyticsWorkspaceName} --name "Lake Database Auto DDL" --file @artifacts/Lake_Database_Auto_DDL.json >> deploySynapse.log 2>&1 + + echo "Creating the Demo Data database using Synapse Serverless SQL..." | tee -a deploySynapse.log # Create a Demo Data database using Synapse Serverless SQL