先简单介绍下两个函数的区别,treeAggregate在aggregate的基础上会多一个depth参数,默认为2,另外在执行comb函数时,aggregate会多执行一次,目前实验没看到其他区别。


aggregate有三个参数,长的很奇怪,是吧?第一个参数是zeroValue,可以理解为初始值,第二个参数是seq函数,第三个参数是comb函数。treeAggregate类似,只是多了一个depth参数。


seq函数作用于每个元素,comb函数作用于每个seq函数输出的结果。


测试代码如下:

val d = sc.parallelize(List(1, 2),3)

def seq(a:Int, b:Int):Int={
    val result = a * b * b
    println("seq, " + a + ", " + b + ", " + result)
    result
}

def comb(a:Int, b:Int):Int={
    val result = a + b
    println("comb, " + a + ", " + b + ", " + result)
    result
}

d.aggregate(100)(seq, comb)

d.treeAggregate(100)(seq, comb, 2)

d.treeAggregate(100)(seq, comb, 3)


输出如下:

# d.aggregate(100)(seq, comb)
seq, 100, 2, 400
seq, 100, 1, 100
comb, 100, 400, 500
comb, 500, 100, 600
comb, 600, 100, 700
Int = 700
# d.treeAggregate(100)(seq, comb, 2)
seq, 100, 2, 400
seq, 100, 1, 100
comb, 100, 100, 200
comb, 200, 400, 600
Int = 600
# d.treeAggregate(100)(seq, comb, 3)
seq, 100, 1, 100
seq, 100, 2, 400
comb, 100, 400, 500
comb, 500, 100, 600
Int = 600


看点更复杂的例子,求1到20的平方和:

val d = sc.parallelize(1 to 20, 5)

def seq(a:(Int, Int), b:Int):(Int, Int)={
  val sum = a._1 + b * b
  val count = a._2 + 1
  println("seq:\t" + a + "\t" + b + "\t" + sum + "\t" + count)
  (sum, count)
}

def comb(a:(Int, Int), b:(Int, Int)):(Int, Int)={
  val sum = a._1 + b._1
  val count = a._2 + b._2
  println("comb:\t" + a + "\t" + b + "\t" + sum + "\t" + count)
  (sum, count)
}

d.aggregate((0,0))(seq, comb)

d.treeAggregate((0,0))(seq, comb, 2)

d.treeAggregate((0,0))(seq, comb, 3)