您有几个选择。我将在这里描述三个:
- 如果您有附近的 Azure SQL DB 并且卷不是太大,请使用
CREATE EXTERNAL TABLE 显示该 Azure SQL DB 中的表,或者简单地使用 Azure 数据工厂 (ADF) 复制数据,执行递归CTE 然后使用 ADF 将其移植回来。或者在此数据进入您的 SQL 池之前使用某种预处理。
- 递归 CTE 只是一天结束时的一种循环,因此 Synapse 支持
WHILE。现在显然这种类型的循环不能很好地转化为 Synapse,因为它很健谈,但对于层次结构深度较低的小体积来说可能是一种选择。以这种方式无效地使用 MPP 架构与编写替代方案之间的权衡取决于您。
我编写了一个选项 2 的示例,它只运行了几行就花费了 20 多秒。通常我会认为这是不可接受的,但如前所述,您可以权衡替代方案:
IF OBJECT_ID('dbo.someHierarchy') IS NOT NULL
DROP TABLE dbo.someHierarchy;
CREATE TABLE dbo.someHierarchy (
code INT NOT NULL,
[name] VARCHAR(50) NOT NULL,
under INT NOT NULL
)
WITH
(
DISTRIBUTION = ROUND_ROBIN,
HEAP
);
INSERT INTO dbo.someHierarchy ( code, [name], under )
SELECT 1, 'National Sales Manager', 1
UNION ALL
SELECT 2, 'Regional Sales Manager', 1
UNION ALL
SELECT 3, 'Area Sales Manager', 2
UNION ALL
SELECT 4, 'Sales Manager', 3
INSERT INTO dbo.someHierarchy ( code, [name], under )
SELECT 5, 'Lead Bob', 5
UNION ALL
SELECT 6, 'Main Bob', 5
UNION ALL
SELECT 7, 'Junior Bob 1', 6
UNION ALL
SELECT 8, 'Junior Bob 2', 6
INSERT INTO dbo.someHierarchy ( code, [name], under )
SELECT 9, 'Jim - CEO', 9
UNION ALL
SELECT 10, 'Tim - CFO', 9
UNION ALL
SELECT 11, 'Rob - CIO', 9
UNION ALL
SELECT 12, 'Bob - VP', 10
UNION ALL
SELECT 13, 'Shon - Director', 12
UNION ALL
SELECT 14, 'Shane - VP', 11
UNION ALL
SELECT 15, 'Sheryl - VP', 11
UNION ALL
SELECT 16, 'Dan - Director', 15
UNION ALL
SELECT 17, 'Kim - Director', 15
UNION ALL
SELECT 18, 'Carlo - PM', 16
UNION ALL
SELECT 19, 'Monty - Sr Dev', 18
UNION ALL
SELECT 20, 'Chris - Sr Dev', 18
IF OBJECT_ID('tempdb..#tmp') IS NOT NULL DROP TABLE #tmp;
CREATE TABLE #tmp (
xlevel INT NOT NULL,
code INT NOT NULL,
[name] VARCHAR(50) NOT NULL,
under INT NOT NULL,
ultimateParent INT NOT NULL
);
-- Insert first level; similar to anchor section of CTE
INSERT INTO #tmp ( xlevel, code, [name], under, ultimateParent )
SELECT 1 AS xlevel, code, [name], under, under AS ultimateParent
FROM dbo.someHierarchy
WHERE under = code;
-- Loop section
DECLARE @i INT = 1
WHILE EXISTS (
SELECT * FROM dbo.someHierarchy h
WHERE NOT EXISTS ( SELECT * FROM #tmp t WHERE h.code = t.code )
)
BEGIN
-- Insert subsequent levels; similar to recursive section of CTE
INSERT INTO #tmp ( xlevel, code, [name], under, ultimateParent )
SELECT t.xlevel + 1, h.code, h.[name], h.under, t.ultimateParent
FROM #tmp t
INNER JOIN dbo.someHierarchy h ON t.code = h.under
WHERE h.under != h.code
AND t.xlevel = @i;
-- Increment counter
SET @i += 1
-- Loop guard
IF @i > 99
BEGIN
RAISERROR( 'Too many loops!', 16, 1 )
BREAK
END
END
SELECT 'loop' s, *
FROM #tmp
ORDER BY code, xlevel;
结果:
条件是WHILE EXISTS 循环是一种特别昂贵的方法,因此也许有一种更简单的方法来处理您的数据。
第三种选择是使用 Azure Synapse Notebook 和 GraphFrames 之类的库来遍历层次结构。有更简单的方法可以做到这一点,但我发现 Connected Components 方法能够确定最终管理者。使用 GraphFrames 的一个优点是它允许更复杂的图形查询,例如,如果需要,可以使用图案。
此笔记本使用的是 Spark (Scala) 版本:
将正确版本的graphFrames library 上传到 Spark:
%%configure -f
{
"conf": {
"spark.jars": "abfss://{yourContainer}@{yourDataLake}.dfs.core.windows.net/synapse/workspaces/{yourWorkspace}/sparkpools/{yourSparkpool}/libraries/graphframes-0.8.1-spark2.4-s_2.11.jar",
}
}
为您的环境配置带有大括号的元素。
导入相关库:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
从专用 SQL 池中获取数据并将其分配给数据框:
// Get a table from Synapse dedicated SQL pool, select / rename certain columns from it to vertices and edge dataframes
val df = spark.read.synapsesql("yourDB.dbo.someHierarchy")
val v = df.selectExpr("code AS id", "name AS empName", "under")
v.show
// Reformat the code/under relationship from the original table
// NB Exclude because in graph terms these don't have an edge
val e = df.selectExpr("code AS src", "under AS dst", "'under' AS relationship").where("code != under")
e.show
从顶点和边数据框创建图框:
// Create the graph frame
val g = GraphFrame(v, e)
print(g)
为 connectedComponents 设置检查点:
// The connected components adds a component id to each 'group'
// Set a checkpoint to start
sc.setCheckpointDir("/tmp/graphframes-azure-synapse-notebook")
对数据运行连通分量算法:
// Run connected components algorithm against the data
val cc = g.connectedComponents.run() // doesn't work on Spark 1.4
display(cc)
加入原始顶点数据帧和连接组件算法的结果,并将其写回 Azure Synapse 专用 SQL 池:
val writeDf = spark.sqlContext.sql ("select v.id, v.empName, v.under, cc.component AS ultimateManager from v inner join cc on v.id = cc.id")
//display(writeDf)
writeDf.write.synapsesql("someDb.dbo.someHierarchy2", Constants.INTERNAL)
结果:
我感觉有一种更简单的方法可以用笔记本完成此任务,但期待看到一些替代方案。在此处对 Synapse 上的递归 CTE 的反馈项目进行投票:
https://feedback.azure.com/forums/307516-azure-synapse-analytics/suggestions/14876727-support-for-recursive-cte