Microsoft is leveraging Azure to enable extensibility across Dynamic 365 and Power apps by making data easily accessible to clients via the export to Azure Data Lake feature. Before that could happen, Microsoft needed to collaborate with other industry heavyweights to create a data format standard named Common Data Model (CDM) which is the format used to save data in the Data Lake.
In this article, we are going to focus on how to read and write data saved in the CDM format which is a hierarchy of folders that store the data file in CSV and the schema in JSON format. This enables any compute type to access this data if it understands how to query the data in CSV while fetching its schema in the model.json or the newer manifest.cdm.json files.
To begin this process, we need to export our data to the Datalake from either Azure Dataverse or Dynamics 365 Finance and Operations (F&O). The reason other Dynamics 365 and Power apps are not called out is because they leverage Dataverse as their data backend, while F&O leverages Azure SQL database in the backend. For folks struggling to grasp what Dataverse is, it is the new branding of the Common Data Service (CDS), and it can be considered Database as a service. Apps and Users interface with Dataverse through a set of API calls. The components of the product itself are a combination of Azure services such as Azure SQL Database, Azure Cosmos, Azure Blob, Azure Search, Azure Functions, Azure Service hub, Azure events hub, and additional components such as app service and load balancer etc.. The services are orchestrated and managed by Microsoft which hides the complexity under the hood. Users, however, like to have access to their data to do further analytics and reporting, which is enabled by the export to data lake option that benefits all Dynamics 365 and power apps that leverage Azure Dataverse as their data backend.
For the ERP system, F&O’s export to data lake is a work in progress. It is currently being tested with a select set of customers in private preview, after that it should be open to more testers in public preview before it becomes Generally Available. We will get in front of the public preview event to prepare ourselves with knowledge and tools necessary to analyze that data once it lands in our Data lake. The same learning can be applied to CDM data synced to Data lake from Dataverse which is generally available, or even Power BI Data flows. If you need CDM data to experiment, you can use the SQL to lake solution to export SQL data to a Datalake in CDM format, and you can leverage this limited time offer for Azure Synapse Analytics to save on cost. Once the data lands in the Data Lake in CDM format, it can be acted upon by different compute types such as Power BI Data Flows, Azure Databricks, or Azure Synapse Analytics. We will focus on the latter, since it offers the capability of attaching a Data lake to a workspace, and also offers two types of compute to read the data, transform it, and use it for ML model training among several other use cases.
It’s time to share some code snippets that I gathered from different docs and modified as needed. We will start with reading CDM data from an attached Data Lake with Apache Spark using a Synapse notebook. In Synapse Develop hub, click the plus sign to create a new notebook, and copy and paste each code snippet to a cell.
# Specifying appid, appkey and tenanid is optional in spark-cdm-connector-assembly-0.16.jar with Premium Databricks Cluster and Synapse appid = "<appId>" appkey = "<appKey>" tenantid = "<tenantId>" storageAccountName = "storageAccount.dfs.core.windows.net"
# Reading the content of the entity SalesCube_Customers into the readDF dataframe readDf = (spark.read.format("com.microsoft.cdm") .option("storage", storageAccountName) .option("manifestPath","/dynamics365-financeandoperations/lm-d365-xxxxxxxxxxxx.cloudax.dynamics.com/AggregateMeasurements/SalesCube/model.json") .option("entity", "SalesCube_Customers") .load()) readDf.select("*").show()
# Create a Spark SQL Database and write the contents of our Dataframe to the Customers table. #Please note that this database and its content can be read with the built-in SQL pool compute. spark.sql("CREATE DATABASE IF NOT EXISTS financeopdb") readDf.write.mode("overwrite").saveAsTable("financeopdb.Customers")
#Changing the context from pyspark to SQL and reading the content of the newly created table %%sql select * from financeopdb.Customers
That was pretty straight forward, and if you need to make transformations to data before you write it back, you will need to add appropriate code to accomplish that. In this example, we wrote data to a Spark SQL database that can be accessed by the built-in SQL compute or even Power BI. If you need to write the dataframe to storage in CDM format, then follow these instructions.
To read CDM data with the built-in SQL compute (On Demand), we will use these scripts from Vesa Tikkanen github repo that were written for reading Power BI Data Flow CDM folders, but can be repurposed for any CDM data produced by Azure Dataverse or Dynamics 365 F&O. I have done some slight modifications to the scripts to account for some differences in the folders hierarchy, and also changed a datatype that Synapse was complaining about. Below, I will paste the three scripts equivalent to the three steps right after creating a schema.
Step 1: Create a view with information about entities and their model.json
IF EXISTS(select 1 from sys.views as v INNER JOIN sys.schemas as s ON v.schema_id=s.schema_id where v.name='MyDynamixModels' and s.name='dynamixfo') BEGIN DROP VIEW [dynamixfo].[MyDynamixModels]; END GO CREATE VIEW [dynamixfo].[MyDynamixModels] AS SELECT /* Originally Power BI Dataflow reader for Synapse. Reader created by Vesa Tikkanen */ [result].filepath(1) AS [rootfolderName] ,[result].filepath(2) AS [EntityOrCubeName] ,JSON_VALUE(jsoncontent, '$.name') as EntityCubeName ,convert(datetime2,JSON_VALUE(jsoncontent, '$.modifiedTime')) as modifiedTime ,JSON_VALUE(entity.Value, '$.name') as entityName ,JSON_VALUE(partitions.Value, '$.location') as FileName ,JSON_VALUE(attributes.Value, '$.name') as ColumnName ,JSON_VALUE(attributes.Value, '$.dataType') as ColumnDataType , CASE WHEN JSON_VALUE(attributes.Value, '$.dataType')='date' then 'date' WHEN JSON_VALUE(attributes.Value, '$.dataType')='dateTime' then 'datetime2' WHEN JSON_VALUE(attributes.Value, '$.dataType')='int64' then 'bigint' WHEN JSON_VALUE(attributes.Value, '$.dataType')='string' then 'nvarchar(4000)' WHEN JSON_VALUE(attributes.Value, '$.dataType')='decimal' then 'decimal(18,6)' WHEN JSON_VALUE(attributes.Value, '$.dataType')='boolean' then 'bit' WHEN JSON_VALUE(attributes.Value, '$.dataType')='double' then 'float' ELSE JSON_VALUE(attributes.Value, '$.dataType') END as ColumnDataTypeSQL ,[result].filepath() as modelFileName FROM OPENROWSET( -- HERE CHANGE YOUR Azure Datalake Gen2 account that you're using for your DataFlow's BULK 'https://storageAccounr.dfs.core.windows.net/ContainerName/lm-d365-xxxxxxxxxxxxxx.cloudax.dynamics.com/*/*/model.json', FORMAT = 'CSV', FIELDQUOTE = '0x0b', FIELDTERMINATOR ='0x0b', ROWTERMINATOR = '0x0b' ) WITH ( jsonContent varchar(MAX) ) AS [result] CROSS APPLY OPENJSON(JSON_QUERY(jsoncontent, '$.entities')) as entity CROSS APPLY OPENJSON(entity.Value, '$.attributes') as attributes CROSS APPLY OPENJSON(entity.Value, '$.partitions') as partitions
Step 2: Create Stored Procedure that takes three parameters and returns a view with entity data
/****** Object: StoredProcedure [dynamixfo].[spcreateModelView] Script Date: 3.30.2021 ******/ SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO CREATE PROCEDURE [dynamixfo].[spcreateModelView] @rootfolderName nvarchar(200), @EntityOrCubeName nvarchar(200), @entityName nvarchar(200) AS BEGIN /* Power BI Dataflow reader for Synapse. Reader created by Vesa Tikkanen */ DECLARE @columnsChar nvarchar(max)=''; DECLARE @columnsCharSel nvarchar(max)=''; -- check if names of the workspace, dataflow or entity contains not compatible characters. IF CHARINDEX('[',@rootfolderName)>0 OR CHARINDEX(']',@rootfolderName)>0 OR CHARINDEX('[',@EntityOrCubeName)>0 OR CHARINDEX(']',@EntityOrCubeName)>0 OR CHARINDEX('[',@entityName)>0 OR CHARINDEX(']',@entityName)>0 BEGIN PRINT 'Escape characters detected. Cannot create.' RETURN END select @columnsChar = STRING_AGG(colname,', ') from (select '[' + ColumnName + '] ' + CASE when ColumnDataTypeSQL='datetime2' then 'nvarchar(200)' when ColumnDataTypeSQL='date' then 'nvarchar(200)' else ColumnDataTypeSQL END as colname from [dynamixfo].[MyDynamixModels] where rootfolderName=@rootfolderName and EntityOrCubeName=@EntityOrCubeName and entityName=@entityName ) as a; select @columnsCharSel = STRING_AGG(colname,', ') from (select CASE when ColumnDataTypeSQL='datetime2' then 'convert(datetime2,[' + ColumnName + '],101) AS [' + ColumnName + '] ' when ColumnDataTypeSQL='date' then 'convert(date,[' + ColumnName + '],101) AS [' + ColumnName + '] ' else '[' + ColumnName + '] ' END as colname from [dynamixfo].[MyDynamixModels] where rootfolderName=@rootfolderName and EntityOrCubeName=@EntityOrCubeName and entityName=@entityName ) as a; --select @columnsChar; DECLARE @sqlcmd nvarchar(max); SET @sqlcmd = N'DROP view if exists dynamixfo.[' + @rootfolderName + '_'+ @EntityOrCubeName + '_' + @entityName + N'];'; EXECUTE sp_executesql @sqlcmd; --select @sqlcmd; select @sqlcmd = N'CREATE view dynamixfo.[' + @rootfolderName + '_' + @EntityOrCubeName + '_' + @entityName + N'] AS' + STRING_AGG(subselect,' UNION ALL ') from (select N' SELECT ' + @columnsCharSel+ N' FROM OPENROWSET( BULK ''' + REPLACE(FileName,'%20',' ') + N''', FORMAT = ''CSV'', PARSER_VERSION=''2.0'' ) WITH ( ' + @columnsChar + N' ) AS [result] ' as subselect from [dynamixfo].[MyDynamixModels] where rootfolderName=@rootfolderName and EntityOrCubeName=@EntityOrCubeName and entityName=@entityName group by FileName ) as a; --select @sqlcmd; EXECUTE sp_executesql @sqlcmd; END GO
Step 3: Execute the stored procedure
# You can repeat this step for all the views that you need to create. # Note the examples are for Aggregate Measurements, but F&O datalake sync will expose raw tables as well EXECUTE [dynamixfo].[spcreateModelView] 'AggregateMeasurements','Recruiting','Recruiting_Performance'
In terms of data refresh, the views once created in the SqlOnDemand database will be pointing to the datalake files, and as they get updated by the data sync process, the views will return the latest information when executed. I have tested it manually by changing data in the CSV files, and it does pick the new changes. This solution can be used with the serverless SQL compute until the Openrowset function can handle the CDM format. There is a feature request under review, which we hope will make it to the roadmap as CDM format is becoming widely used across Microsoft Data estate.