首页 > 大数据 > spark入门教程(3)--Spark 核心API开发
2017
02-17

spark入门教程(3)--Spark 核心API开发

千淘万漉博客阿里云大使推广链接

 

本教程源于2016年3月出版书籍《Spark原理、机制及应用》 ,在此以知识共享为初衷公开部分内容,如有兴趣,请支持正版书籍。

        

 


        Spark综合了前人分布式数据处理架构和语言的优缺点,使用简洁、一致的函数式语言Scala作为主要开发语言,同时为了方便更多语言背景的人使用,还支持JavaPythonR语言。Spark因为其弹性分布式数据集(RDD)的抽象数据结构设计,通过实现抽象类RDD可以产生面对不同应用场景的子类。本章将先介绍Spark编程模型、RDD的相关概念、常用API源码及应用案例,然后具体介绍四大应用框架,为后续进一步学习Spark框架打下基础。


3.1 Spark 编程模型概述

Spark的编程模型如图3-1所示。

 

3-1 Spark编程模型

       开发人员在编写Spark应用的时候,需要提供一个包含main函数的驱动程序作为程序的入口,开发人员根据自己的需求,在main函数中调用Spark提供的数据操纵接口,利用集群对数据执行并行操作。

       Spark为开发人员提供了两类抽象接口。第一类抽象接口是弹性分布式数据集(Resilient Distributed Dataset,下文简称RDD),顾名思义,RDD是对数据集的抽象封装,开发人员可以通过RDD提供的开发接口来访问和操纵数据集合,而无需了解数据的存储介质(内存或磁盘)、文件系统(本地文件系统、HDFS或Tachyon)、存储节点(本地或远程节点)等诸多实现细节;第二类抽象是共享变量(Shared Variables),通常情况下,一个应用程序在运行的时候会被划分成分布在不同执行器之上的多个任务,从而提高运算的速度,每个任务都会有一份独立的程序变量拷贝,彼此之间互不干扰,然而在某些情况下需要任务之间相互共享变量,Apache Spark提供了两类共享变量,它们分别是:广播变量(Broadcast Variable)和累加器(Accumulators)。第3.3节将介绍RDD的基本概念和RDD提供的编程接口,并在后面详细解读接口的源码实现,从而加深对RDD的理解,此外会在第3.4节中介绍两类共享变量的使用方法。


3.2 Spark Context

       SparkContext是整个项目程序的入口,无论从本地读取文件(textfile方法)还是从HDFS读取文件或者通过集合并行化获得RDD,都先要创建SparkContext对象,然后使用SparkContext对RDD进行创建和后续的转换操作。本节主要介绍SparkContext类的作用和创建过程,然后通过一个简单的例子向读者介绍SparkContext的应用方法,从应用角度来理解其作用。


3.2.1  SparkContext的作用

 

SparkContext除了是Spark的主要入口,它也可以看作是对用户的接口,它代表与Spark集群的连接对象,由图3-2可以看到,SparkContext主要存在于Driver Program中。可以使用SparkContext来创建集群中的RDD、累积量和广播量,在后台SparkContext还能发送任务给集群管理器。每一个JVM只能有运行一个程序,即对应只有一个SparkContext处于激活状态,因此在创建新的SparkContext前需要把旧的SparkContext停止。


3-2  SparkContextSpark架构图中的位置

3.2.2 SparkContext创建

       SparkContext的创建过程首先要加载配置文件,然后创建SparkEnvTaskSchedulerDAGScheduler,具体过程和源码分析如下。

1.加载配置文件SparkConf

       SparkConf在初始化时,需先选择相关的配置參数,包含master、appName、sparkHome、jars、environment等信息,然后通过构造方法传递给SparkContext,这里的构造函数有多种表达形式,当SparkContex获取了全部相关的本地配置信息后开始下一步操作。

[java] view plain copy
  1. def this(master: String, appName: String, conf: SparkConf) =  

  2.     this(SparkContext.updatedConf(conf, master, appName))  

  3. def this(  

  4.   master: String,  

  5.   appName: String,  

  6.   sparkHome: String = null,  

  7.   jars: Seq[String] = Nil,  

  8.   environment: Map[String, String] = Map(),  

  9.   preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =  

  10.   {  

  11.     this(SparkContext.updatedConf(newSparkConf(),master,appName,sparkHome,jars,environment))  

  12.     this.preferredNodeLocationData = preferredNodeLocationData  

  13.   }  

 


 

2.创建SparkEnv
  创建SparkConf后就需要创建SparkEnv,这里面包括了很多Spark执行时的重要组件,包括 MapOutputTracker、ShuffleFetcher、BlockManager等,在这里源码是通过SparkEnv类的伴生对象SparkEnv Object内的createDriverEnv方法实现的。
[java] view plain copy
  1. private[spark] defcreateDriverEnv(  

  2.       conf: SparkConf,  

  3.       isLocal: Boolean,  

  4.       listenerBus: LiveListenerBus,  

  5.       mockOutputCommitCoordinator:Option[OutputCommitCoordinator] = None): SparkEnv = {  

  6.    assert(conf.contains("spark.driver.host"),"spark.driver.host is not set on the driver!")  

  7.     assert(conf.contains("spark.driver.port"),"spark.driver.port is not set on the driver!")  

  8.     val hostname =conf.get("spark.driver.host")  

  9.     val port =conf.get("spark.driver.port").toInt  

  10.     create(  

  11.       conf,  

  12.       SparkContext.DRIVER_IDENTIFIER,  

  13.       hostname,  

  14.       port,  

  15.       isDriver = true,  

  16.       isLocal = isLocal,  

  17.       listenerBus = listenerBus,  

  18.       mockOutputCommitCoordinator =mockOutputCommitCoordinator  

  19.   )  

  20. }  



 

[java] view plain copy
  1. <p style="background:#F3F3F3;">  

  2. </p>  

 

3.创建TaskScheduler

        创建SparkEnv后,就需要创建SparkContext中调度执行方面的变量TaskScheduler

[java] view plain copy
  1.  private[spark] var (schedulerBackend, taskScheduler) =    

  2.    SparkContext.createTaskScheduler(this, master)  

  3. private val heartbeatReceiver = env.actorSystem.actorOf(  

  4.     Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")  

  5.   @volatile private[spark] var dagScheduler: DAGScheduler = _  

  6.   try {  

  7.     dagScheduler = new DAGScheduler(this)  

  8.   } catch {  

  9.     case e: Exception => {  

  10.       try {  

  11.         stop()  

  12.       } finally {  

  13.         throw new SparkException("Error while constructing DAGScheduler", e)  

  14.       }  

  15.     }  

  16.   }  

  17.   

  18.   // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's  

  19.   // constructor  

  20.   taskScheduler.start()  

        TaskScheduler是依据Spark的执行模式进行初始化的,详细代码在SparkContext中的createTaskScheduler方法中。在这里以Standalone模式为例,它会将sc传递给TaskSchedulerImpl,然后创建SparkDeploySchedulerBackend并初始化,最后返回Scheduler对象。

[java] view plain copy
  1. case SPARK_REGEX(sparkUrl) =>  

  2.         val scheduler = new TaskSchedulerImpl(sc)  

  3.         val masterUrls = sparkUrl.split(",").map("spark://" + _)  

  4.         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)  

  5.         scheduler.initialize(backend)  

  6.         (backend, scheduler)  


4.创建DAGScheduler

创建TaskScheduler对象后,再将TaskScheduler对象传至DAGScheduler,用来创建DAGScheduler对象。

[java] view plain copy
  1. @volatile private[spark] var dagScheduler: DAGScheduler = _  

  2.   try {  

  3.     dagScheduler = new DAGScheduler(this)  

  4.   } catch {  

  5.     case e: Exception => {  

  6.       try {  

  7.         stop()  

  8.       } finally {  

  9.         throw new SparkException("Error while constructing DAGScheduler", e)  

  10.       }  

  11.     }  

  12.   }  

  13.  def this(sc: SparkContext) = this(sc, sc.taskScheduler)  

创建DAGScheduler后再调用其start()方法将其启动。以上4点是整个SparkContext的创建过程,这其中包含了很多重要的步骤,从这个过程能理解Spark的初始启动情况。



3.2.3 使用shell

        除了单独编写一个应用程序的方式之外,Spark还提供了一个交互式Shell来使用。在Shell中,用户的每条语句都能在输入完毕后及时得到结果,而无需手动编译和运行程序。Shell的使用十分简单,改变当前工作路径到Spark的安装目录,执行指令$ ./bin/spark-shell即可进入Shell。
         在Shell中,系统根据命令提供的参数自动配置和生成了一个SparkContext对象sc,直接使用即可,无需再手动实例化SparkContext。除了结果会实时显示之外,其余操作与编写单独应用程序类似。读者可直接参考Spark官方提供的Spark ProgrammingGuide等文档,在此不做具体介绍。



3.2.4 应用实践
 这里向读者介绍一段用于统计文件中字母a和字母b出现频率的Spark应用,通过这个程序向读者展示SparkContext的用法。

【例3-1】简单的Spark程序

 

[java] view plain copy
  1. /* SimpleApp.scala */  

  2. import org.apache.spark.SparkContext  

  3. import org.apache.spark.SparkContext._  

  4. import org.apache.spark.SparkConf  

  5. object SimpleApp {  

  6.   def main(args: Array[String]) {  

  7.     val logFile = "YOUR_SPARK_HOME/README.md"               // 本地文件目录   

  8.     val conf = new SparkConf().setAppName("Simple Application")       //给Application命名    

  9.     val sc = new SparkContext(conf)                                //创建SparkContext  

  10.     val logData = sc.textFile(logFile, 2).cache()                      //缓存文件  

  11.     val numAs = logData.filter(line => line.contains("a")).count()         //计算字母a的个数  

  12.     val numBs = logData.filter(line => line.contains("b")).count()         //计算字母b的个数  

  13.     println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))   //打印结果  

  14.   }  


 

        这个例子中,首先创建本地文件目录logFile和配置文件conf,然后使用配置信息conf实例化SparkContext得到sc得到sc之后就可以从本地文件中读取数据并把数据转化成RDD,并命名为logData,然后logData调用filter方法分别计算包含字母a的行数和包含字母b的行数,最后打印出结果。该例子中使用了SparkContext的实例化对象创建RDD数据集。


3.3 RDD简介

       本节主要介绍弹性分布式数据集RDD的相关概念,其中包括RDD创建来源、两种重要的TransformationAction操作、数据持久化和检查点机制,通过对SparkRDD核心抽象的深入理解,能帮助读者全面理解后面的RDD的分区、并行计算和依赖等机制和变换过程。


3.3.1 RDD创建

 

       RDDSpark应用程序开发过程中最为基本也最为重要的一类数据结构,RDD被定义为只读、分区化的记录集合,更为通俗来讲,RDD是对原始数据的进一步封装,封装导致两个结果:第一个结果是数据访问权限被限制,数据只能被读,而无法被修改;第二个结果是数据操作功能被强化,使得数据能够实现分布式存储、并发处理、自动容错等等诸多功能。Spark的整个计算过程都是围绕数据集RDD来进行,下面将会对RDD的创建以及数据结构进行简单介绍。

1.RDD的两类来源

    1)将未被封装的原始数据进行封装操作得到,根据原始数据的存在形式,又可被进一步分成由集合并行化获得或从外部数据集中获得。

    2)由其他RDD通过转换操作获得,由于RDD的只读特性,内部的数据无法被修改,因此RDD内部提供了一系列数据转换(Transformation)操作接口,这类接口可返回新的RDD,而不影响原来的RDD内容。在后面第3章3.3节中将会对RDD的创建方法进行更加详尽的说明。

2.RDD内部数据结构

1)分区信息的列表

2)对父RDD的依赖信息

3)对Key-Value键值对数据类型的分区器(可选)

4)计算分区的函数

5)每个数据分区的物理地址列表(可选)

        RDD的数据操作并非在调用内部接口的一刻便开始计算,而是遇到要求将数据返回给驱动程序,或者写入到文件的接口时,才会进行真正的计算,这类会触发计算的操作称为动作(Action)操作,而这种延时计算的特性,被称为RDD计算的惰性(Lazy),在第六章机篇将分别讲述动作操作和惰性特征。

    在第1章中说过,Spark是一套内存计算框架,其能够将频繁使用的中间数据存储在内存当中,数据被使用的频率越高,性能提升越明显。数据的内存化操作在RDD层次上,体现为RDD的持久化操作,在3.3.4节描述RDD的持久化操作。除此之外,RDD还提供了类似于持久化操作的检查点机制,表面看上去与存储在HDFS的持久化操作类似,实际使用上又有诸多不同,在3.3.5小节描述RDD的检查点机制。


3.3.2 RDD转换操作

 

        转换(Transformation)操作是由一个RDD转换到另一个新的RDD,例如,map操作在RDD中是一个转换操作,map转换会让RDD中的每一个数据都通过一个指定函数得到一个新的RDD。

        RDD内部可以封装任意类型的数据,但某些操作只能应用在封装键值对类型数据的RDD之上,例如转换操作reduceByKey、groupByKey和countByKey等。

       表3-1展示了RDD所提供的所有转换操作及其含义。

表3-1:RDD提供的转换操作

Transformation

算子作用

map(func)

新RDD中的数据由原RDD中的每个数据通过函数func得到

filter(func)

新RDD种的数据由原RDD中每个能使函数func返回true值的数据组成

flatMap(func)

类似于map转换,但func的返回值是一个Seq对象,Seq中的元素个数可以是0或者多个

mapPartitions(func)

类似于map转换,但func的输入不是一个数据项,则是一个分区,若RDD内数据类型为T,则func必须是Iterator<T> => Iterator<U>类型

mapPartitionsWithIndex(func)

类似于mapPartitions转换,但func的数据还多了一个分区索引,即func类型是(Int, Iterator<T> => Iterator<U>)

sample(withReplacement, fraction, seed)

对fraction中的数据进行采样,可以选择是否要进行替换,需要提供一个随机数种子

union(otherDataset)

新RDD中数据是原RDD与RDD otherDataset中数据的并集

Intersection(otherDataset)

新RDD中数据是原RDD与RDD otherDataset中数据的交集

distinct([numTasks])

新RDD中数据是原RDD中数据去重的结果

groupByKey([numTasks])

原RDD中数据类型为(K, V)对,新RDD中数据类型为(K, Iterator(V))对,即将相同K的所有V放到一个迭代器中

reduceByKey(func, [numTasks])

原RDD和新RDD数据的类型都为(K, V)对,让原RDD相同K的所有V依次经过函数func,得到的最终值作为K的V

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

原RDD数据的类型为(K, V),新RDD数据的类型为(K, U),类似于groupbyKey函数,但聚合函数由用户指定。键值对的值的类型可以与原RDD不同

sortByKey([ascending], [numTasks])

原RDD和新RDD数据的类型为(K, V)键值对,新RDD的数据根据ascending的指定顺序或者逆序排序

join(otherDataset, [numTasks])

原RDD数据的类型为(K, V),otherDataset数据的类型为(K, W),对于相同的K,返回所有的(K, (V, W))

cogroup(otherDataset, [numTasks])

原RDD数据的类型为(K, V),otherDataset数据的类型为(K, W),对于相同的K,返回所有的(K, Iterator<V>, Iterator<W>)

catesian(otherDataset)

原RDD数据的类型为为T,otherDataset数据的类型为U,返回所有的(T, U)

pipe(command, [envValue])

令原RDD中的每个数据以管道的方式依次通过命令command,返回得到的标准输出

coalesce(numPartitions)

减少原RDD中分区的数目至指定值numPartitions

repartition(numPartitions)

修改原RDD中分区的数目至指定值numPartitions



3.3.3 RDD动作操作

 

        相对于转换,动作(Action)操作用于向驱动(Driver)程序返回值或者将值写入到文件当中。例如reduce动作会使用同一个指定函数让RDD中的所有数据做一次聚合,把运算的结果返回。表3-2展示了RDD所提供的所有动作操作及其含义。

3-2RDD提供的动作操作

Action

算子作用

reduce(func)

令原RDD中的每个值依次经过函数func,func的类型为(T, T) => T,返回最终结果

collect()

将原RDD中的数据打包成数组并返回

count()

返回原RDD中数据的个数

first()

返回原RDD中的第一个数据项

take(n)

返回原RDD中前n个数据项,返回结果为数组

takeSample(withReplacement, num, [seed])

对原RDD中的数据进行采样,返回num个数据项

saveAsTextFile(path)

将原RDD中的数据写入到文本文件当中

saveAsSequenceFile(path)(Java and Scala)

将原RDD中的数据写入到序列文件当中

savaAsObjectFile(path)(Java and Scala)

将原RDD中的数据序列化并写入到文件当中。可以通过SparkContext.objectFile()方法加载

countByKey()

原RDD数据的类型为(K, V),返回hashMap(K, Int),用于统计K出现的次数

foreach(func)

对于原RDD中的每个数据执行函数func,返回数组

 

3.3.4 惰性计算

 

        需要注意的是,一个RDD执行转换操作之后,数据的计算是延迟的,新生成的RDD会记录转换的相关信息,包括父RDD的编号、用户指定函数等等,但并不会立即执行计算操作,真正的计算操作过程得等到遇到一个动作操作(Action)才会执行,此外,除非用户指定持久化操作,否则转换过程中产生的中间数据在计算完毕后会被丢弃,即数据是非持久化。即使对同一个RDD执行相同的转换操作,数据同样会被重新计算。

       Spark采取惰性计算机制有其道理所在。例如可以实现通过map方法创建的一个新数据集,然后使用reduce方法,最终只返回 reduce 的结果给driver,而不是整个大的新数据集。


3.3.5 RDD持久化

    

        惰性计算的缺陷也是明显的:中间数据默认不会保存,每次动作操作都会对数据重复计算,某些计算量比较大的操作可能会影响到系统的运算效率,因此Spark允许将转换过程中手动将某些会被频繁使用的RDD执行持久化操作,持久化后的数据可以被存储在内存、磁盘或者Tachyon当中,这将使得后续的动作(Actions)变得更加迅速(通常快10倍)。

        通过调用RDD提供的cache或persist函数即可实现数据的持久化,persist函数需要指定存储级别(StorageLevel),cache等价于采用默认存储级别的persist函数,Spark提供的存储级别及其含义如表3-3所示。在6.4节会继续讨论RDD持久化过程在源码级别上的实现细节。

表3-3  RDD的存储级别

存储级别

含义

MEMORY_ONLY

把RDD以非序列化状态存储在内存中,如果内存空间不够,则有些分区数据会在需要的时候进行计算得到

MEMORY_AND_DISK

把RDD以非序列化存储在内存中,如果内存空间不够,则存储在硬盘中

 

MEMORY_ONLY_SER

把RDD以Java对象序列化储存在内存中,序列化后占用空间更小,尤其当使用快速序列化库(如Kyro[1])时效果更好。缺点是读数据要反序列化,会消耗CPU计算资源

MEMORY_AND_DISK_SER

类似MEMORY_ONLY_SER,区别是当内存不够的时候会把RDD持久化到磁盘中,而不是在需要它们的时候实时计算

DISK_ONLY

只把RDD存储到磁盘中

MEMORY_ONLY_2,

类似MEMORY_ONLY,不同的是会复制一个副本到另一个集群节点

MEMORY_AND_DISK_2, etc.

类似MEMORY_AND_DISK,不同的是会复制一个副本到另一个集群节点

 

OFF_HEAP

把RDD以序列化形式存储在Tachyon中,与MEMORY_ONLY_SER不同的是,使用OFF-HEAP模式会减少垃圾回收的开销,此外还能让执行器共享内存,这种模式更适应于多并发和对内存要求高的环境

 


3.3.6 RDD检查点

        因为DAG中血统(lineage)如果太长,当重计算的时候开销会很大,故使用检查点机制,将计算过程持久化到磁盘,这样如果出现计算故障的时候就可以在检查点开始重计算,而不需要从头开始。RDD的检查点(Checkpoint)机制类似持久化机制中的persist(StorageLevel.DISK_ONLY),数据会被存储在磁盘当中,两者最大的区别在于:持久化机制所存储的数据,在驱动程序运行结束之后会被自动清除;检查点机制则会将数据永久存储在磁盘当中,如果不手动删除,数据会一直存在。换句话说,检查点机制存储的数据能够被下一次运行的应用程序所使用。

        检查点的使用与持久化类似,调用RDD的checkpoint方法即可。在6.4小节中继续介绍检查点机制的实现以及其与持久化过程的区别。


3.4 共享变量

        因为在tasks之间读写共享变量会很低效,spark提供两种类型的共享变量类型,即broadcast variablesaccumulators


3.4.1 广播变量
        广播变量(Broadcast variables)允许用户将一个只读变量缓存到每一台机器之上,而不像传统变量一样,拷贝到每一个任务当中,同一台机器上的不同任务可以共享该变量值。如例3-1代码所示,对于变量v,只需要调用SparkContext.broadcast(v)即可得到变量v的广播变量broadcastVar,通过调用broadcastVar的value方法即可取得变量值。

【例3-1】广播变量的用法

[java] view plain copy
  1. scala> val broadcastVar = sc.broadcast(Array(123))  

  2. broadcastVar:spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)  

  3. scala> broadcastVar.value  

  4. res0: Array[Int] = Array(123)  


3.4.2 累加器

        累加器(Accumulators)是另外一种共享变量。累加器变量只能执行加法操作,但其支持并行操作,这意味着不同任务多次对累加器执行加法操作后,加法器最后的值等于所有累加的和。累加器的值只能被驱动程序访问,集群中的任务无法访问该值。

【例3-2】累加器的用法

[java] view plain copy
  1. scala> val accum = sc.accumulator(0"My Accumulator")  

  2. scala> accum.value()     //(通过这种方法进行读取原始变量值)  

  3. accum: spark.Accumulator[Int] = 0  

  4. scala> sc.parallelize(Array(1234)).foreach(x => accum += x)  

  5. res2:Int = 10  



3.5 Spark核心开发实践

        本节主要介绍核心开发中RDD的两个主要操作算子TransformationAction的使用方法,由于Spark是基于延迟计算,Transforamation算子并不立即执行,这时只是保存计算状态,当Action算子出现才真正执行计算。为此下面就这两个算子分别学习主要的API方法和应用实例,如果想了解更多关于RDDAPI操作,建议读者参考拉筹伯大学教授的个人主页http://homepage.cs.latrobe.edu.au/zhe/。


3.5.1 单值型Tranformation算子

单值型的算子就是输入为单个值形式,这里主要介绍map、flatMap、mapPartitions、union、cartesian、groupBy、filter、distinct、subtract、foreach、cache、persist、sample以及takeSample方法,如表3-4列出各方法的简要概述。

表3-4  单值型Transformation算子

方法名

方法定义

map

def map[U](f: (T) ? U)(implicit arg0: ClassTag[U]): RDD[U]

flatMap

defmapPartitions[U](f: (Iterator[T])? Iterator[U], preservesPartitioning: Boolean = false)

mapPartition

def mapPartitions[U](f: (Iterator[T])? Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

mapPartitionsWith

Index

def mapPartitionsWithIndex[U](f: (Int, Iterator[T])? Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

foreach

def foreach(f: (T) ? Unit): Unit

foreachPartition

def foreachPartition(f: (Iterator[T])? Unit): Unit

glom

def glom(): RDD[Array[T]]

union

def union(other: RDD[T]): RDD[T]

cartesian

def cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

groupBy

def groupBy[K](f: (T) ? K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])]

filter

def filter(f: (T) ? Boolean): RDD[T]

distinct

def distinct(): RDD[T]

subtract

def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

cache

def cache(): RDD.this.type

persist

def persist(): RDD.this.type

sample

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

takeSample

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

 

1map

        对原来每一个输入的RDD数据集进行函数转换,返回的结果为新的RDD,该方法对分区操作是一对一的。

方法源码实现:

def map[U: ClassTag](f: T =>U): RDD[U] = new MappedRDD(this, sc.clean(f))

【例3-3map方法应用样例

[java] view plain copy
  1. val a = sc.parallelize(List("bit""linc""xwc""fjg""wc","spark"), 3)  //创建RDD  

  2. val b = a.map(word => word.length)       //计算每个单词的长度  

  3. val c = a.zip(b)        //拉链方法,把两列数据对应配对成键值对格式  

  4. c.collect       //把结果转换为数组    

  5. res0: Array[(String, Int)] = Array((bit,3), (linc,4), (xwc,3), (fjg,3), (wc,2),(spark,5))  

        这个例子中map方法从a中依次读入一个单词,然后计算单词长度,把最后计算的长度赋给b,然后因为ab的长度相同,使用zip方法将ab中对应元素组成K-V键值对形式,最后使用Action算子中的collect方法把键值对以数组形式输出。


3-3  map方法应用样例

2flatMap

flapMap方法与map方法类似,但是允许在一次map方法中输出多个对象,而不是map中的一个对象经过函数转换生成另一个对象。

方法源码实现:

def flatMap[U: ClassTag](f: T=> TraversableOnce[U]): RDD[U] =new FlatMappedRDD(this, sc.clean(f))

【例3-4flatMap方法应用样例

[java] view plain copy
  1. val a = sc.parallelize(1 to 105)      //生成从1到10的序列,5个分区  

  2. a.flatMap(num => 1 to num).collect       //方法的作用是把每一个num映射到从1到num的序列  

  3. res47: Array[Int] = Array(11212312341234512345612345671234567812345678912345678910)  


这个例子先得到从110的序列,然后调用flatMap方法对输入num依次生成从1num的序列,最后使用collect方法转换成数组输出。

3mapPartitions

mapPartitionsmap的另一个实现。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是作用于每个分区,也就是把每个分区中的内容作为整体来处理的。

方法源码实现:

def mapPartitions[U:ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean =false): RDD[U] = {

    val func = (context: TaskContext, index:Int, iter: Iterator[T]) => f(iter)

    new MapPartitionsRDD(this, sc.clean(func),preservesPartitioning)

       }


【例3-5mapPartitions方法应用样例

[java] view plain copy
  1. scala> val a = sc.parallelize(1 to 93)  

  2. scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {         

  3.     var res = List[(T, T)]()   

  4. var pre = iter.next       

  5. while (iter.hasNext) {  

  6.         val cur = iter.next  

  7.         res .::= (pre, cur)    

  8.         pre = cur         

  9.     }   

  10.     res.iterator  

  11. }  

  12. scala> a.mapPartitions(myfunc).collect  

  13. res3: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))  


如图3-4,这个例子是先得到从19的序列,因为有3个分区,所以每个分区数据分别是(1,2,3),(4,5,6)和(7,8,9),然后调用mapPartitions方法,因为scala是函数式编程,函数能作为参数值,所以mapPartition方法输入参数是myfunc函数。myfunc函数的作用是先构造一个空list集合,输入单元素集合iter,输出双元素Tuple集合,把分区中一个元素和它的下一个元素组成一个Tuple。因为每个分区中最后一个元素没有下一个元素,所以(3,4)(6,7)不在结果中。

mapPartitions还有其他的类似实现,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数,此外还有mapPartitionsWithIndex,它能把分区中的index信息传递给用户指定的输入函数,这些其他类似的实现都是基于map方法,只是细节不同,这样做更方面使用者在不同场景下的应用。

3-4  mapPartitions方法应用样例


4mapPartitionWithIndex

mapPartitionWithIndex方法与mapPartitions方法功能类似,不同的是mapPartition-WithIndex还会对原始分区的索引进行追踪,这样能知道分区所对应的元素,方法的参数为一个函数,函数的输入为整型索引和迭代器。

方法源码实现:

def mapPartitionsWithIndex[U:ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning:Boolean = false): RDD[U] = {

    val func = (context: TaskContext, index:Int, iter: Iterator[T]) => f(index, iter)

    new MapPartitionsRDD(this, sc.clean(func),preservesPartitioning)

}

【例3-6mapPartitionWithIndex方法应用

[java] view plain copy
  1. val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)  

  2. def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {  

  3.   iter.toList.map(x => index + "," + x).iterator  

  4. }  

  5. x.mapPartitionsWithIndex(myfunc).collect()  

  6. res10: Array[String] = Array(0,10,20,31,41,51,62,72,82,92,10)  


这个例子中先得到一个名为的x序列,然后调用mapPartitionsWithIndex方法,参数为myfunc函数,这个函数的实现是把输入经过map方法映射为分区索引加值的形式。结果中的01表示分区下标0和第一个输入值1,后面依次输出其他分区和对应的值,说明分区数是从下标0开始的。

5.foreach

foreach方法主要是对输入的数据对象执行循环操作,该方法常用来输出RDD中的内容。

方法源码实现:

def foreach(f: T => Unit) {

    val cleanF = sc.clean(f)

    sc.runJob(this, (iter: Iterator[T]) =>iter.foreach(cleanF))

}

【例3-7foreach方法应用

[java] view plain copy
  1. val c = sc.parallelize(List("xwc""fjg""wc""dcp""zq""snn""mk""zl""hk""lp"), 3)  

  2. c.foreach(x => println(x + " are from BIT"))  

  3. xwc are from BIT  

  4. fjg are from BIT  

  5. wc are from BIT  

  6. dcp are from BIT  

  7. zq are from BIT  

  8. ssn are from BIT  

  9. mk are from BIT  

  10. zl are from BIT  

  11. hk are from BIT  

  12. lp are from BIT  


这个方法比较直观,直接对c变量中的每一个元素对象使用println函数,打印对象内容。

6foreachPartition

foreachPartition方法的作用是通过迭代器参数对RDD中每一个分区的数据对象应用函数。mapPartitions方法的作用于foreachPartition方法作用非常相似,区别就在于使用的参数是否有返回值。

方法源码实现:

def foreachPartition(f:Iterator[T] => Unit) {

    val cleanF = sc.clean(f)

    sc.runJob(this, (iter: Iterator[T]) =>cleanF(iter))

}

【例3-8foreachPartition方法应用样例

[java] view plain copy
  1. val b = sc.parallelize(List(123456789), 3)  

  2.  b.foreachPartition(x => println((a,b) => x.reduce(a + b)))  

  3.  6  

  4. 15  

  5. 24  


这个例子是将序列b中的每一个元素进行reduce操作,对每个分区中输入的每一个元素累加,例如对于分区0,输入1和2相加等于3,然后把上个结果3与下一个输入3相加就等于6,其他分区的运算与该分区一样。

7glom

作用类似collect,但它不知直接将所有RDD直接转化为数组形式,glom方法的作用是将RDD中分区数据进行组装到数组类型RDD中,每一个返回的数组包含一个分区的元素,按分区转化为数组,最后有几个分区就返回几个数组类型的RDD

方法源码实现:

def glom(): RDD[Array[T]] = newGlommedRDD(this)

private[spark] classGlommedRDD[T: ClassTag](prev: RDD[T])extends RDD[Array[T]](prev) {

    overridedef getPartitions: Array[Partition] = firstParent[T].partitions

    overridedef compute(split: Partition, context: TaskContext) =

    Array(firstParent[T].iterator(split,context).toArray).iterator

}

【例3-9glom方法应用样例

[java] view plain copy
  1. val a = sc.parallelize(1 to 993)  

  2. a.glom.collect  

  3. res5: Array[Array[Int]] = Array(Array(123456789101112131415161718192021222324252627282930313233), Array(343536373839404142434445464748495051525354555657585960616263646566), Array(676869707172737475767778798081828384858687888990919293949596979899))  


这个例子很简洁,在执行glom方法后就调用collect方法获得Array数组并输出,可以看出a.glom方法输出的是三个数组组成的RDD,其中每个数组代表一个分区数据。

8union

union方法(等价于“++”)是将两个RDD取并集,取并集过程中不会把相同元素去掉。union操作是输入分区与输出分区多对一模式,如图所示。

方法源码实现:

def union(other: RDD[T]):RDD[T] = new UnionRDD(sc, Array(this, other))

class UnionRDD[T: ClassTag](

    sc: SparkContext,

    var rdds: Seq[RDD[T]])

  extends RDD[T](sc, Nil) {

       overridedef getPartitions: Array[Partition] = {

          val array = newArray[Partition](rdds.map(_.partitions.size).sum)

          var pos = 0

          for ((rdd, rddIndex) <- rdds.zipWithIndex;split <- rdd.partitions) {

             array(pos) = new UnionPartition(pos,rdd, rddIndex, split.index)

             pos += 1

          }

          array

      }

     overridedef getDependencies: Seq[Dependency[_]] = {

         valdeps = new ArrayBuffer[Dependency[_]]

         varpos = 0

         for(rdd <- rdds) {

            deps += new RangeDependency(rdd, 0, pos,rdd.partitions.size)

            pos += rdd.partitions.size

         }

         deps

    }

    override def compute(s: Partition, context:TaskContext): Iterator[T] = {

        valpart = s.asInstanceOf[UnionPartition[T]]

       parent[T](part.parentRddIndex).iterator(part.parentPartition,context)

    }

 

    overridedef getPreferredLocations(s: Partition): Seq[String] =

   s.asInstanceOf[UnionPartition[T]].preferredLocations()

    overridedef clearDependencies() {

       super.clearDependencies()

       rdds= null

    }

}

【例3-10union方法应用样例

[java] view plain copy
  1. val a = sc.parallelize(1 to 42)  

  2. val b = sc.parallelize(2 to 41)  

  3. (a ++ b).collect  

  4. res4: Array[Int] = Array(1234234)  


如图3-5可见,这个例子先创建2RDD变量ab,然后对ab使用union方法,返回两个RDD并集的结果。

3-5  union方法应用样例


9cartesian

计算两个RDD中每个对象的笛卡尔积(例如第一个RDD中的每一个对象与第二个RDD中的对象join连接),但使用该方法时要注意可能出现内存不够的情况。

方法源码实现:

def cartesian[U:ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)

class CartesianRDD[T: ClassTag,U: ClassTag](

    sc: SparkContext,

    var rdd1 : RDD[T],

    var rdd2 : RDD[U])

  extends RDD[Pair[T, U]](sc, Nil)

  with Serializable {

      valnumPartitionsInRdd2 = rdd2.partitions.size

       overridedef getPartitions: Array[Partition] = {

       //create the cross product split

         val array = newArray[Partition](rdd1.partitions.size * rdd2.partitions.size)

         for(s1 <- rdd1.partitions; s2 <- rdd2.partitions) {

            val idx = s1.index * numPartitionsInRdd2+ s2.index

            array(idx) = new CartesianPartition(idx,rdd1, rdd2, s1.index, s2.index)

         }

         array

      }

     overridedef getPreferredLocations(split: Partition): Seq[String] = {

        valcurrSplit = split.asInstanceOf[CartesianPartition]

        (rdd1.preferredLocations(currSplit.s1)++ rdd2.preferredLocations(currSplit.s2)).distinct

     }

    overridedef compute(split: Partition, context: TaskContext) = {

       valcurrSplit = split.asInstanceOf[CartesianPartition]

       for (x <- rdd1.iterator(currSplit.s1,context);

       y <- rdd2.iterator(currSplit.s2,context)) yield (x, y)

    }

    overridedef getDependencies: Seq[Dependency[_]] = List(new NarrowDependency(rdd1) {

          def getParents(id: Int): Seq[Int] = List(id/ numPartitionsInRdd2)

    },

    new NarrowDependency(rdd2) {

      def getParents(id: Int): Seq[Int] =List(id % numPartitionsInRdd2)

    }

)

  override def clearDependencies() {

    super.clearDependencies()

    rdd1 = null

    rdd2 = null

  }

}

【例3-11cartesian方法应用样例

 

[java] view plain copy
  1. val x =sc.parallelize(List(1,2,3),1)  

  2. val y =sc.parallelize(List(4,5),1)  

  3. x.cartesian(y).collect  

  4. res0: Array[(Int, Int)] =Array((1,4),(1,5),(2,4),(2,5),(3,4),(3,5))  

 

例子中x是第一个RDD,其中的每个元素都跟y中元素进行连接,如果第一个RDDm个元素,第二个RDD中元素n个,则求笛卡尔积后总元素为m×n个,本例结果为6个,如图3-6所示。

3-6  cartesian方法应用样例



 

 

10groupBy

groupBy方法有三个重载方法,功能是将元素通过map函数生成Key-Value格式,然后使用reduceByKey方法对Key-Value对进行聚合,具体可参考源码实现。

方法源码实现:

def groupBy[K](f: T => K, p:Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)

      : RDD[(K, Iterable[T])] = {

   valcleanF = sc.clean(f)                      //对用户函数预处理

   this.map(t=> (cleanF(t), t)).groupByKey(p)  //对数据进行map操作,生成Key-Value对,再聚合

}

def groupBy[K](f: T =>K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =

    groupBy[K](f, defaultPartitioner(this))                //使用默认分区器

def groupBy[K](f: T => K,numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =

    groupBy(f, new HashPartitioner(numPartitions)) //使用hash分区器,分区数自定义

【例3-12groupBy方法应用样例

[java] view plain copy
  1. 1)val a = sc.parallelize(1 to 93)  

  2. a.groupBy(x => { if (x % 2 == 0"even" else "odd" }).collect  

  3. res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2468)), (odd,ArrayBuffer(1357,          9)))  

  4. 2)val a = sc.parallelize(1 to 93)  

  5. def myfunc(a: Int) : Int =  

  6. {  

  7.   a % 2  

  8. }  

  9. a.groupBy(myfunc).collect  

  10. res3: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2468)), (1,ArrayBuffer(13579)))  

  11. 3)val a = sc.parallelize(1 to 93)  

  12. def myfunc(a: Int) : Int =  

  13. {  

  14.   a % 2  

  15. }  

  16. a.groupBy(myfunc(_), 1).collect  

  17. res7: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2468)), (1,ArrayBuffer(13579)))  


第一个例子中是单个参数,调用groupBy方法,结果集的key只有两种,即evenodd,然后对相同的key进行聚合得到最终结果。第二个例子和第三个例子本质一样,只是使用的重载方法不同。

11filter

filter方法通过名称就能猜出来功能,其实就是对输入元素进行过滤,参数是一个返回值为boolean的函数,如果函数对输入元素运算结果为true,则通过该元素,否则将该元素过滤,不能进入结果集。

方法源码实现:

def filter(f: T => Boolean):RDD[T] = new FilteredRDD(this, sc.clean(f))

【例3-13filter方法应用样例

[java] view plain copy
  1. 1)val a = sc.parallelize(1 to 103)  

  2. val b = a.filter(x => x % 3 == 0)  

  3. b.collect  

  4. res3: Array[Int] = Array(369)  

  5. 2)val b = sc.parallelize(1 to 8)  

  6. b.filter(x => x < 4).collect  

  7. res15: Array[Int] = Array(123)  

  8. 3)val a = sc.parallelize(List("cat""horse"4.03.52"dog"))  

  9. a.filter(_ < 4).collect  

  10. <console>:15: error: value < is not a member of Any  


第一个和第二个例子比较好理解,因为a中元素都是整型,可以顺利进行比较,但第三个例子会报错,因为a中有部分对象不能与整数比较,如果使用scala中的偏函数就可以解决混合数据类型的问题。

12distinct

RDD中重复的元素去掉,只留下唯一的RDD元素。

方法源码实现:

def distinct(): RDD[T] =distinct(partitions.size)

def distinct(numPartitions:Int)(implicit ord: Ordering[T] = null): RDD[T] =

map(x => (x,null)).reduceByKey((x, y) => x, numPartitions).map(_._1)

【例3-14distinct方法应用样例

[java] view plain copy
  1. 1)val c = sc.parallelize(List("Gnu""Cat""Rat""Dog""Gnu""Rat"), 2)  

  2. c.distinct.collect  

  3. res6: Array[String] = Array(Dog, Gnu, Cat, Rat)  

  4. 2)val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))  

  5. a.distinct(2).partitions.length  

  6. res16: Int = 2  

  7. 3)a.distinct(3).partitions.length  

  8. res17: Int = 3  


这个例子就是把RDD中的元素mapKey-Value对形式,然后使用reduceByKey将重复Key合并,也就是把重复元素删除,只留下唯一的元素。此外distinct有一个重载方法需要一个参数,这个参数就是分区数numPartitions,从例子中看出使用带参的distinct方法不仅能删除重复元素,而且还能对结果重新分区。

13subtract

subtract的含义就是求集合A-B的差,即把集合A中包含集合B的元素都删除,结果是剩下的元素。

方法源码实现:

def subtract(other: RDD[T], p:Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = {

    if (partitioner == Some(p)) {

        val p2 = newPartitioner() {

           override def numPartitions = p.numPartitions

           overridedef getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)

        }

       // Unfortunately, since we're making a new p2,we'll get ShuffleDependencies

       //anyway, and when calling .keys, will not have a partitioner set, even though

       // the SubtractedRDD will, thanks to p2'sde-tupled partitioning, already be

       //partitioned by the right/real keys (e.g. p).

       this.map(x=> (x, null)).subtractByKey(other.map((_, null)), p2).keys

      } else {

         this.map(x=> (x, null)).subtractByKey(other.map((_, null)), p).keys

     }

}

【例3-15subtract方法应用样例

[java] view plain copy
  1. val a = sc.parallelize(1 to 93)  

  2. val b = sc.parallelize(1 to 33)  

  3. val c = a.subtract(b)  

  4. c.collect  

  5. res3: Array[Int] = Array(694758)  


这个例子就是把a中包含b中的元素都删除掉,底层实现使用subtractByKey,也就是根据键值对中的Key来删除a包含的b元素。

14persist,cache

cache方法顾名思义,是缓存数据,其作用是把RDD缓存到内存中,以方便下一次计算被再次调用。

方法源码实现:

 def cache(): this.type = persist()

【例3-16cache方法应用样例

[java] view plain copy
  1. val c = sc.parallelize(List("a""b""c""d""e""f"),1)  

  2. c.cache  

  3. res11: c.type = ParallelCollectionRDD[10] at parallelize at <console>:21  


这个例子就是直接把RDD缓存在内存中。

15persist

persist方法的作用是把RDD根据不同的级别进行持久化,通过使用带参数方法能指定持久化级别,如果不带参数则为默认持久化级别,即只保存到内存,与cache等价。

【例3-17persist方法应用样例

 

[java] view plain copy
  1. val a = sc.parallelize(1 to 93)  

  2. a.persist(StorageLevel.MEMORY_ONLY)  


 

这个例子使用persist方法,指定持久化级别为MEMORY_ONLY,该级别等价于cache方法。

16sample

sample的作用是随机的对RDD中的元素采样,获得一个新的子集RDD,根据参数能指定是否又放回采样、子集占总数的百分比和随机种子。

方法源码实现:

def sample(withReplacement:Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T] = {

    require(fraction >= 0.0, "Negativefraction value: " + fraction)

    if (withReplacement) {

      new PartitionwiseSampledRDD[T, T](this,new PoissonSampler[T](fraction), true, seed)

    } else {

      new PartitionwiseSampledRDD[T, T](this,new BernoulliSampler[T](fraction), true, seed)

    }

 }

【例3-18sample方法应用样例

[java] view plain copy
  1. 1)val a = sc.parallelize(1 to 10002)  

  2. a.sample(false0.10).collect  

  3. res4: Array[Int] = Array(321222748505780889097113126130135145162169,        182230237242267271287294302305324326330351361378383384409412418,        432433485493497502512514521522531536573585595615617629640642647,    651664671673684692707716718721723736738756759788799827828833872,  898899904915916919927929951969980)  

  4. 2)val a = sc.parallelize(1 to 1002)  

  5. a.sample(true0.30).collect  

  6. res5: Array[Int] = Array(11918182426293234373842434551545660656770,     7374747585869599)  


上述例子中第一个参数withReplacementtrue时使用放回抽样(泊松抽样[1]),为false时使用不放回抽样(伯努利抽样),第二个参数fraction是百分比,第三个参数seed是种子,也就是随机取值的起源数字。从例子中还看出当选择放回抽样时,取出的元素中会出现重复值。



本文》有 0 条评论

留下一个回复