-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreduce.scala
More file actions
70 lines (44 loc) · 1.27 KB
/
reduce.scala
File metadata and controls
70 lines (44 loc) · 1.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import org.apache.spark.sql.functions.{col}
val numbers = List(1,2,3,4,5)
val aBinaryOperator : (Int,Int) => Int = (a,b) => {
println(s"a = $a ; b = $b")
a+b
}
val sum = numbers.reduce(aBinaryOperator)
val data1 = Seq(
(1,"aaa","aa","a"),
(2,"bbb","bb","b"),
(3,"ccc","cc","c")
)
val data2 = Seq(
("2024-11-25","aaa","aa","a",100),
("2024-11-25","bbb","bb","b",120),
("2024-11-26","aaa","aa","a",140),
("2024-11-26","ccc","cc","c",90),
("2024-11-26","bbb","bb","b",110)
)
val df1 = data1.toDF("UnitId","UnitA","UnitB","UnitC")
val df2 = data2.toDF("Date","Level1","Level2","Level3","Spend")
//APPROACH : 1
val joinCondition= (df1.col("UnitA") === df2.col("Level1")) and (df1.col("UnitB") === df2.col("Level2")) and (df1.col("UnitC") === df2.col("Level3"))
val resultDF = df2.join(df1,joinCondition,"left").select(
col("Date"),
col("UnitId"),
col("Spend")
)
resultDF.show
//APPROACH : 2
val correspondingColumns = Seq(
("Level1","UnitA"),
("Level2","UnitB"),
("Level3","UnitC")
)
val joinCondition2 = correspondingColumns
.map{case(leftCol , rightCol) => df2.col(leftCol) === df1.col(rightCol)}
.reduce((a,b) => a and b)
val resultDf = df2.join(df1,joinCondition2,"left").select(
col("Date"),
col("UnitId"),
col("Spend")
)
resultDf.show