亚洲鞋业资讯网,最新潮鞋资讯分享!

微信号:weixin888

2023_Spark_实验十一:RDD高级算子操作

时间:2024-03-31 21:04人气: 编辑:亚洲鞋业资讯网


//checkpoint :

sc.setCheckpointDir("hdfs://Master:9000/ck") // 设置检查点

val rdd = sc.textFile("hdfs://Master:9000/input/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) // 执行wordcount任务的转换



rdd.checkpoint // Mark this RDD for checkpointing.

rdd.isCheckpointed

rdd.count //触发计算,日志显示:ReliableRDDCheckpointData: Done checkpointing RDD 27 to hdfs://hadoop001:9000/ck/fce48fd4-d76f-

4322-8d23-6a48d1aed7b5/rdd-27, new parent is RDD 28

rdd.isCheckpointed // res61: Boolean = true

rdd.getCheckpointFile // Option[String] = Some(hdfs://Master:9000/ck/b9a5add8-18d8-4056-9e8e-271d9522a29c/rdd-4)

coalesce :

总所周知,spark的rdd编程中有两个算子repartition和coalesce。公开的资料上定义为,两者都是对spark分区数进行调整的算子。

        repartition会经过shuffle,其实际上就是调用的coalesce(shuffle=true)。

        coalesce,默认shuffle=false,不会经过shuffle。

        当前仅针对coalesce算子考虑,我们看一下官方的定义:

        大概意思为:如果你想要从1000个分区到100个分区,并且不经过shuffle,近乎平均分配10个父分区到1个子分区。

        首先我说下我个人简单理解:不经过shuffle,就意味着coalesce算子前后都是在一个stage中的。从该stage开始到coalesce算子之前的任务的迭代执行的并行度都是1000,从coalesce算子开始到该stage结束的任务的迭代执行的并行度都是100。


val rdd1 = sc.parallelize(1 to 10, 10)

// 重新分区,分为两个2 ,不产生shuffle

val rdd2 = rdd1.coalesce(2, false)



// 获取新的RDD分区数

rdd2.partitions.length

def func1(index:Int,iter:Iterator[Int]):Iterator[String] = {

iter.toList.map(x=>"[PartID:"+index + ",value=" + x +"]").iterator

}

// 查看分区后的结果:

rdd2.mapPartitionsWithIndex(func1).collect



repartition:

val rdd1 = sc.parallelize(1 to 10, 4)

val rdd2 = rdd1.repartition(5)

collect、toArray

将RDD转换为Scala的数组。

collectAsMap

与collect、toArray相似。collectAsMap将key-value型的RDD转换为Scala的map。

注意:map中如果有相同的key,其value只保存最后一个值。


# 创建一个2分区的RDD

scala> var z = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

z: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[129] at parallelize at :21

# 输出所有分区的数据

scala> z.collect

res44: Array[(String, Int)] = Array((cat,2), (cat,5), (mouse,4), (cat,12), (dog,12), (mouse,2))



# 转化为字典

scala> z.collectAsMap

res45: scala.collection.Map[String,Int] = Map(dog -> 12, cat -> 12, mouse -> 2)

scala>


collectAsMap

val rdd = sc.parallelize(List(("a", 1), ("b", 2)))

rdd.collectAsMap

//res2: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)

combineByKey与aggregateByKey相比较:

  1. 1.相同点:

  • 两者都能映射key值分别进行分区内计算和分区间计算。

    2. 不同点:

  • combineByKey有三个参数列表而且不需要初始值,而aggregateByKey只有两个参数列表且需要初始值。

aggregateByKey分区内计算示意图

//aggregateByKey存在函数颗粒化,有两个参数列表

//第一个参数列表,需要传递一个参数,表示为初始值

// 主要当碰见第一个key时候,和value进行分区内计算

//第二个参数列表,需要传递2个参数

// 第一个参数表示分区内计算

// 第二个参数表示分区间计算



rdd.aggregateByKey(zeroValue = 0)(

(x, y) => math.max(x, y),

(x, y) => x + y

).collect().foreach(println)
标签: [db:TAGS]  
相关资讯
热门频道

热门标签

官方微信官方微博百家号

网站简介 | 意见反馈 | 联系我们 | 法律声明 | 广告服务

Copyright © 2002-2024 欧亿●平台●官网-OE SPORTS 版权所有 网站地图 备案号:粤06094508号