搜档网
当前位置:搜档网 › spark面试必问题

spark面试必问题

spark面试必问题
spark面试必问题

1、spark都有什么特色?

运行速度快:使用DAG(有向无环图)执行引擎以支持循环数据流与内存计算(当然也有部分计算基于磁盘,比如shuffle)

易用性好:支持使用Scala、Java、Python和R语言进行编程,可以通过Spark Shell进行交互式编程

通用性强:Spark提供了完整而强大的工具,包括SQL查询、流式计算、机器学习和图算法组件

随处运行:可运行于独立的集群模式中,可运行于Hadoop中,也可运行于Amazon

EC2等云环境中,并且可以访问HDFS、Cassandra、HBase、Hive等多种数据源

2、spark和hadoop有什么区别?

解决问题的出发点不一样,

Hadoop用普通硬件解决存储和计算问题,Spark用于构建大型的、低延迟的数据分析应用程序,不实现存储;Spark是在借鉴了MapReduce之上发展而来的,继承了其分布式并行计算的优点并改进了MapReduce明显的缺陷,Spark中间数据放到内存中,迭代运算效率高,Spark引进了弹性分布式数据集的抽象,数据对象既可以放在内存,也可以放在磁盘,容错性高,可用自动重建RDD,计算时可以通过CheckPoint来实现容错;Hadoop只提供了Map和Reduce操作,Spark更加通用,提供的数据集操作类型有很多种,主要分为:Transformations和Actions两大类

目前,Spark主要用于大数据的计算,而Hadoop以后主要用于大数据的存储(HDFS),以及资源调度(Yarn)。

3、spark和mapreduce有什么区别?

1)spark和mapreduce的shuffle相比,spark的shuffle过程不用等,用的时候再进行排序

2)spark可以把经常用到的中间数据通过RDD放入内存中,而maprduce需要通过HDFS从磁盘中取数据

3)spark会的招式多,各种算子方便计算

4)spark过度依赖内存,当内存不够的时候,就很难堪。(此原因是因为频繁GC会导致task线程等待)

4、spark生态都有那些主要组件,它们分别都解决什么问题?

spark core实现了spark的基本功能、能够进行任务调度、内存管理、错误恢复与存储系统交互等模块

spark sql用来查询数据,与数据库交互

Spark Streaming用来对实时数据进行流式计算

MLLib是机器学习功能的程序库

GraphX是用于图计算

5、spark有几种运行模式?

本地模式:Spark单机运行,一般用于开发测试。

Standalone模式:构建一个由Master+Slave构成的Spark集群,Spark运行在集群中。StandAlone模式对内存要求高

Spark on Yarn模式:Spark客户端直接连接Yarn。不需要额外构建Spark集群。spark on yarn可以提高Hadoop集群的计算速度。

Spark on Mesos模式:Spark客户端直接连接Mesos。不需要额外构建Spark集群。

6、spark架构由哪几部分组成?每部分有什么作用?分别对应yarn的什么组件?

Driver:任务发起者,申请资源,启动任务,对应yarn中的AM+Client

Master:管理Worker,调度资源,对应yarn中的RM

Work:报告资源使用情况,启动Executor,对应yarn中的NM

Executor:用来执行任务,对应yarn中的yarnchild

7、说说spark任务执行流程?

1)Driver端提交任务,向Master申请资源

2)Master与Work进行RPC通信,让Work启动Executor

3)Executor启动会主动连接Drive,通过Drive->Master->Work-Executor从而得

到Driver在哪里

4)Driver会产生Task,提交给Executor中启动Task去做真正的计算

8、sparkHA的自动切换模式是借助什么实现的?它在spark中有什么作用?

Zookeeper的高可用功能

1)选举

2)保存alive的Master信息

3)保存worker的信息

9、什么是RDD?它具有什么主要特性?

RDD是分布式对象集合,本质上是一个只读的弹性分区记录集合,RDD基于稳定的物理存储中的数据集来创建或者通过在其他RDD上执行转换操作创建新的RDD。RDD提供了一组丰富的操作以支持常见的数据运算,分为“行动”(Action)和“转换”(Transformation)两种类型。

1)高效的容错性。为了实现容错,节点之间会有大量数据传输,必然会出现数据丢失的情况,当需要恢复数据时,只需要通过不同RDD之间依赖的血缘关系重新计算丢失的分区来实现容错,无需回滚整个系统;并且RDD的血缘关系记录的不是具体数据,是转换操作,降低了容错开销。

2)转化结果持久到内存。多个RDD之间转化的数据存在内存,避免了磁盘IO开销

3)可以直接存JAVA对象,减少序列化导致的开销。

10、spark是如何容错的?

Lineage机制和checkpoint机制

RDD的不同依赖关系导致spark对不同的依赖关系有不同的处理方式。 对于窄依赖而言,由于窄依赖的一个RDD 分区最多对应一个子RDD分区,在此情况下出现计算结果丢失,由于计算结果只依赖父RDD相关数据有关,所以不需要计算全部数据,只需计算部分数据即可。而宽依赖实质是指一个父RDD的分区会对应一个或多个子RDD多个分区,在此情况下,丢失一个子RDD分区重算的每一个父RDD的每一个分区的全部数据并非都给丢失的子RDD分区用的,会有一部分数据相当于相应的是未丢失的子RDD分区中须要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。因此假设使用Checkpoint算子来做检查点,不仅要考虑Lineage是否足够长,也要考虑是否有宽依赖,对宽依赖

加Checkpoint是最物有所值的。

通过上述分析能够看出在下面两种情况下,RDD须要加checkpoint。

1)DAG中的Lineage过长,假设重算,则开销太大(如在PageRank中)。

2)在宽依赖上做Checkpoint获得的收益更大。

checkpoint(本质是通过将RDD写入Disk做检查点)是为了lineage做容错的辅

助。lineage过长会造成容错成本过高。这样就不如在中间阶段做检查点容错,假设之后有节点出现故障而丢失分区。从做检查点的RDD開始重做Lineage,就会降低开销。

11.一个rdd的多少个task是由什么决定的?一个spark的job能同时并行运行多少个任务是由什么决定的?

由分区决定的,一个分区对应一个task;core决定。

12、RDD在spark中的运行流程?

1)创建RDD对象;

2)SparkContext负责计算RDD之间的依赖关系,构建DAG(有向无环图);

3)DAGScheduler负责把DAG分解成多个阶段,每个阶段中包含了多个任务,每个任务会被TaskScheduler任务调度器分发给各个工作节点(Worker Node)上的Executor去执行。

13、rdd的foreach的foreachPartition有什么不同?groupby和reduceby有什么区别?

foreach是在每个分区中直接对iterator运行foreach操作

foreachPartition是在每个分区中把iterrator传入func函数,让func自己对iterator进行处理

reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。

groupByKey也是对每个key进行操作,但只生成一个sequence。需要特别注意“Note”中的话,它告诉我们:如果需要对sequence进行aggregation操作(注意,groupByKey本身不能自定义操作函数),那么,选择reduceByKey/aggregateByKey更好。这是因

为groupByKey不能自定义函数,我们需要先用groupByKey生成RDD,然后才能对此RDD 通过map进行自定义函数操作。

在对大数据进行复杂计算时,reduceByKey优于groupByKey。

14、rdd的缓存级别有那些?persist和cache模式是什么级别的缓存?

MEMORY_ONLY 未序列化,内存中的持久化,内存不够就不持久化

MEMORY_AND_DISK 未序列化,内存中的持久化,内存不够就将数据放入磁盘

MEMORY_ONLY_SER 序列化后的内存持久化,内存不够就不持久化

MEMORY_AND_DISK_SER 序列化后的内存持久化,内存不够就不持久化,内存不够就将数据放入磁盘

DISK_ONLY 未序列化,数据全部写入磁盘

MEMORY_ONLY_2 将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上

persist和cache模式都是MEMORY_ONLY级别的缓存

可以用persist() 用来设置RDD的存储级别

15、spark存储体系中的BlockManagerMaster在spark架构中的哪一部分?有什么作用?

在driver的DAGScheduler中,BlockManagerMaster包含BlockMangerinfo,

而BlockMangerinfo内部包含BlockStatus,BlockManagerMaster对各节点上的BlockManager内部的元数据进行维护,比如block增删改操作,都会在这维护元数据的变量。

16、BlockManagerMaster如何维护BlockManager内部的元数据?

只要BlockManger执行了数据的增删改的操作,那么必须将Block的BlockStatus上报

到BlockMangerMaster上去,在BlockMangerMaster上,会对指定的BlockManger的BlockMangerInfo内部BlockStatus进行增删改的操作,从而达到元数据维护的功能。

17、blockManager中的

diskStore、memoryStore、connectionManager、blockManagerWorker都有什么作

用?BolckManagerMaster里面存储的是什么?

DiskStore:对磁盘上的文件进行读写

MemoryStore:对内存上的数据进行读写

ConnectionManger:与其他节点的BlockManger建立网络连接

BlockMangerWork:与其他节点的BlockManger数据读写

BolckManagerMaster存储BlockManger对应的BlockMangerinfo和BlockStatus信息,并进行维护

18、使用广播变量有什么优点?(为什么要使用广播变量)

当RDD的操作要使用driver中定义的变量时,每次操作driver都要把变量发送给worker 节点一次,如果这个变量的数据很大的话,会产生很高的负载,导致执行效率低;使用广播变量可以高效的使一个很大的只读数据发送给多个worker节点,而且对每个worker节点只需要传输一次,每次操作时executor可以直接获取本地保存的数据副本,不需要多次传输

19、使用广播变量和外部变量会有什么不同?

广播变量将一个只读的变量缓存在每台机器上,对worker节点只传输一次,每次操

作executor获取本地内存的广播变量数据副本

每个Task都有一个外部变量,对worker节点传输多次,每个executor拿多个外部变量的值,executor修改后,worker拿不到外部变量的值

20、2.1.5以前版本是默认开启consolidation机制吗?consolidation机制开启与关闭的shuffle 有什么不同?

不开启

关闭时,因为map端磁盘文件太多,导致耗费大量的性能在磁盘文件的创建和写上,造成了大量的IO,严重影响性能,此时每个节点的文件数量等于shuffe map task的数量乘以下一个stage的reducer task数量,文件数量繁多;

开启时,每次只能拉到指定缓存大小的数据量,拉取完了之后再聚合处理,然后再拉取,开启后shuffle map端写磁盘文件的数量会大大减少,此时每个节点的文件数量等于cpu 的core数乘以下个stage的reducer task数量,文件数量会大大减少。

21、spark shuffle过程中导致shuffle output file lost是什么原因?如何优化?

reduce task拉文件时遇到map task的executor的jvm正在full gc时会等待,然后请求重启3次每次等待5秒继续拉取,如果此时仍然拉取不到就导致等待时间过长造成拉取文件失败。

开启consolidation后,调优GC,调优程序,增加等待时间或次数。

22、spark中都支持那些序列化方法?默认是那种?这种默认方法相比java的序列化方法有什么有点?它有几种使用方法分别是?

序列化:把对象转换为字节序列的过程称为对象的序列化。

反序列化:把字节序列恢复为对象的过程称为对象的反序列化。

Spark提供的两种序列化机制

Spark实际上提供了两种序列化机制,它只是默认使用了第一种:

1)Java序列化机制:默认情况下,Spark使用Java自身的ObjectInputStream和ObjectOutputStream机制进行对象的序列化。只要你的类实现了Serializable接口,那么都是可以序列化的。而且Java序列化机制是提供了自定义序列化支持的,只要你实现Serializable接口即可实现自己的更高性能的序列化算法。Java序列化机制的速度比较慢,而且序列化后的数据占用的内存空间比较大。

2)Kryo序列化机制:Spark也支持使用Kryo类库来进行序列化。Kryo序列化机制比Java序列化机制更快,而且序列化后的数据占用的空间更小,通常比Java序列化的数据占用的空间要小10倍。Kryo序列化机制之所以不是默认序列化机制的原因是,有些类型虽然实现了Seriralizable接口,但是它也不一定能够进行序列化;此外,如果你要得到最佳的性能,Kryo还要求你在Spark应用程序中,对所有你需要序列化的类型都进行注册。 kryo序列化更快且比Java占用空间更小,需要事先在应用程序中注册类,但并不支持所有类的序列化,例如匿名内部类因为不能注册,所以不能用Kryo序列化。

val classes: Array[Class[_]] = Array[Class[_]]

(classOf[ORCUtil],classOf[StructObjectInspector],classOf[OrcStruct])

conf.set("spark.serializer", classOf[KryoSerializer].getName)

conf.set("spark.kryo.registrationRequired","true")

conf.registerKryoClasses(classes)

如何使用kryo序列化机制?

首先要用SparkConf设置一个参数,使用new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")即可,即将Spark的序列化器设置

为KryoSerializer,然后对需要序列化的类进行注册。

使用kryo序列化需要注意什么?

1、如果注册的要序列化的自定义的类型,本身特别大,比如包含了超过100个field。那

么就会导致要序列化的对象过大。此时就需要对Kryo本身进行优化。因为Kryo内部的缓存可能不够存放那么大的class对象。此时就需要调用SparkConf.set()方法,设

置spark.kryoserializer.buffer.max参数的值,将其调大。

2、预先注册自定义类型

23.详细说明一下GC对spark性能的影响?

GC会导致spark的性能降低,spark中的task运行时是工作线程,GC是守护线程,守护线程运行时,会让工作线程停止,所以GC运行的时候,会让Task停下来,这样会影响spark 程序的运行速度,降低性能。

默认情况下,Executor的内存空间分60%给RDD用来缓存,只分配40%给Task运行期间动态创建对象,这个内存有点小,很可能会发生full gc,因为内存小就会导致创建的对象很快把内存填满,然后就会GC了,就是JVM尝试找到不再被使用的对象进行回收,清除出内存空间。所以如果Task分配的内存空间小,就会频繁的发生GC,从而导致频繁的Task工作线程的停止,从而降低Spark应用程序的性能。

24、如何解决因为频繁GC导致spark性能降低的问题?

1)可以用通过调整比例,比如将RDD缓存空间占比调整为40%,分配给Task的空间变为了60%,这样的话可以降低GC发生的频率。(new

SparkConf().set("spark.storage.memoryFraction", "0.5"))

2)降低RDD的使用内存的空间,比如调节序列化的级别为MEMORY_DISK_SER

或MEMORY_SER,让RDD的Partition序列化成一个字节的数组

3)使用Kryo序列化类库进行序列化

25、说说JVM的minor gc与full gc原理

1)创建的对象首先放入Eden和Survivor1(可能是短时间)

2)当Eden满了会启动minor gc,回收新生代中不再使用的对象,还要用的就放

入Survivor2中

3)移完之后eden和Survivor1中剩下的就是不再使用的对象,就将他们清理掉

4)Survivor1和Survivor2交换角色。那就是原来的Survivor1成了备用的了,也就是原来的Survivor2

5)多次在Survivor区没有被清理掉的,说明它是长时间使用的,那么将它移动到老年

代,到目前为止世界一切和平

6)由于对象越New越多,minor时发生备用的Survivor区满了,放不进去了,怎么办呢?这个本来可能是短时间生存的对象被放入老年代

7)短时间生存的对象,很可能快速的给老年代占满,白白的浪费老年代的空间,就会触发Full GC,回收老年代的对象

26、spark2以前的版本yarn模式分为那2种?这两种方式都有什么不同?

yarn-client:主要用于测试,driver在本地客户端,集群产生超大量的网络通信时容易网卡流量过大,导致任务失败,这种方式的好处是执行时本地可以看到所有log方便调试。

yarn-cluster,用于生产环境,因为driver运行NM上,没有网卡流量的问题,缺点在于,调试不方便,本地用spark-submit提交之后,看不到log,只能通过yarn app logs这种命令查看,很麻烦

27、map-reduce中数据倾斜的原因?应该如何处理?

比如说作业中大部分都完成了,但是总有几个reduce一直在运行。

这是因为这几个reduce中的处理的数据要远远大于其他的reduce,可能是因为对键值对任务划分的不均匀造成的数据倾斜。

解决的方法可以在分区的时候重新定义分区规则对于value数据很多的key可以进行拆分、均匀打散等处理,或者是在map端的combiner中进行数据预处理的操作。

28、DataFrame.DataSet.RDD有什么相同与不同?它们之间怎么互相转换?

相同点:

1)三者都是spark中的弹性分布式数据集,为处理超大数据而提供便利

2)三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action,如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过

3)三者都可以缓存运算

4)三者都有partition的

5)三者有许多共同的函数,如filter,排序等

6)在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持,import

spark.implicits._

7)DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

不同点:

1)RDD不支持sparksql操作

2)与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,每一列的值没法直接访问

3)DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作

4)DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然,利用这样的保存方式,可以方便的获得字段名和列的对应,而且分隔符(delimiter)可以自由指定

5)Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型可以不同,DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用getAS方法或者模式匹配拿出特定字段

rdd转df,ds:创建DateSet DataFrame,先引用 import spark.implicits._ 然

后rdd.toDS rdd.toDF

29、介绍一下Spark Streaming的内部处理机制?

Spark Streaming在内部的处理机制是,接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,然后通过Spark Engine处理这些批数据,最终得到处理后的一批批结果数据。

对应的批数据,在Spark内核对应一个RDD实例,因此,对应流数据的DStream可以看成是一组RDDs,即RDD的一个序列。通俗点理解的话,在流数据分成一批一批后,通过一个先进先出的队列,然后 Spark Engine从该队列中依次取出一个个批数据,把批数据封装成一个RDD,然后进行处理,这是一个典型的生产者消费者模型。

30、storm与spark streming有什么共同点和区别

1)处理模型以及延迟

虽然这两都具有可扩展性和可容错性,但是处理模型是完全不一样的。storm实现亚秒级别的延时处理,每次处理一条event,而spark streaming是在短暂时间处理多条event。

2)容错和数据保证

两者都有容错时的数据保证,storm的每条记录在移动过程中都被跟踪,保证每条记录最少被处理一次,是细粒度的;spark streaming通过批处理级别对记录追踪,保证每个批处理记录仅仅被处理一次,是粗粒度的。

3)批处理框架集成

spark streaming是在spark框架上运行,可以使用spark的方法来写spark streaming程序,正是因为spark streaming是批处理的,所以它的吞吐量远高于storm,storm每条数据实时处理所以开销较大

4)生产支持

两者都可以在各自的集群框架中运行,storm是mesos上运行,spark streaming可以

在yarn和mesos上运行

31、spark streaming的优缺点?

优点

1)批处理吞吐量大,速度快

2)容错性好,不添加代码和配置就能恢复丢失的工作

3)后台是spark,生态圈强大,社区活跃度高。

4)数据源广泛

缺点

500毫秒的延迟在对高时效性的系统中来说过高

32、说说spark streaming接收数据后的处理流程?

Spark Streaming接收这些实时输入数据流,会将它们按批次划分,然后交给Spark引擎处理,生成按照批次划分的结果流。

Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的

DStream。DStream本质上表示RDD的序列。任何对DStream的操作都会转变为对底

层RDD的操作。

Spark Streaming使用数据源产生的数据流创建DStream,也可以在已有的DStream上使用一些操作来创建新的DStream。

33、为什么spark streaming的程序至少要2个CPU core资源?

因为receiver会占用一个CPU core,省下的一个或多个core是留给处理数据的executor 用的,所以在使用spark-submit提交任务的时候设置cpu的资源参数一定大于1,否则就会提示资源不够,启动不了。

34、如何合理的利用集群的资源提高Spark应用程序的性能?

spark官方推荐设置集群总CPU数量的2到3倍的并行度,也就是task的数量设置成CPU Core数量的2到3倍,这样的话,每个CPU Core可能会分配到并发运行2到3个task线程,那么集群资源,就不太可能出现空闲的情况,而会连续运行,发挥最好的效率。

35、spark中数据本地化是什么?有哪几种级别?

数据本地化,指的是,数据离计算它的代码有多近,数据本地化对于spark job性能有着巨大的影响,如果数据以及要计算的多代码是在一起的,那么性能自然会非常高,但是,如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上,通常来说,移动代码到其他节点,会比移动数据到代码所在的节点上去,速度要快得多,因为代码比较小。spark也正是基于这个数据本地化的原则来构建task调度算法的。

基于数据距离代码的距离,有几种数据本地化级别:

1)PROCESS_LOCAL:数据和计算它的代码在同一个JVM进程中。

2)NODE_LOCAL:数据和计算它的代码在一个节点上,但是不在一个进程中,比如在不同的executor进程中,或者是数据在HDFS文件的block中。

3)NO_PREF:数据从哪里过来,性能都是一样的。

4)RACK_LOCAL:数据和计算它的代码在一个机架上。

5)ANY:数据可能在任意地方,比如其他网络环境内,或者其他机架上。

36、spark如何对数据本地化进行优化?

Task要处理的partition的数据,在某一个executor中,然后taskScheduler首先会尽量用最好的本地化级别去启动task,会尽量在那个包含了要处理的partition的executor中去启动task,spark倾向于使用这种最好的本地化级别来调度task,但是这是不可能的。如果没有任何未处理的数据在空闲的executor上,那么spark就会放大本地化级别。这时有两个选择:第一,等待,直到executor上的cpu释放出来,那么久分配task过去,第二,立即在任意一个executor上启动一个task。

spark默认会等待一会,来期望task要处理的数据所在的节点上的executor空闲出一

个CPU,从而将task分配过去,只要超过了时间,那么spark就会将task分配到其他任意一个空闲的executor上。

可以设置参数,spark.locality系列参数,来调节Spark等待task可以进行数据本地化的时间。spark.locality.wait(3000毫

秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack。

37、说说reduceByKey和groupByKey的区别?

如果能用reduceByKey,那就用reduceByKey,因为它会在map端,先进行本

地combine,可以大大减少要传输到reduce端的数据量,减小网络传输的开销。仅仅是key 对应的values进行聚合为一个值的场景,那么用reduceByKey是非常合适的,因为会先

在ShuffleMapTask端写入本地磁盘文件的时候进行本地的聚合,这样写入的磁盘文件的数据量将大幅度的缩减几倍到几十倍。同样传输到reduceTask端的数据也会减少、时间也会缩短几倍到几十倍;groupByKey的性能,相对来说要差很多,因为它不会在本地进行聚合,而是原封不动,把ShuffleMapTask的输出,拉取到ResultTask的内存中,所以这样的话,就会导致,所有的数据,都要进行网络传输从而导致网络传输性能开销非常大!

38、如何处理spark中的数据倾斜?

1)可以用hive将发生倾斜的key做聚合

2)进行数据的清洗,把发生倾斜的刨除,用单独的程序去算倾斜的key

3)提高shuffle的并行度,用随机前缀,方法是打上随机前缀先聚合一次,然后去掉随机前缀再聚合一次。适用场景groupby

4)指定“倍数”的数据扩容加上随机“倍数”值前缀。适用场景join

5)join时小数据join大数据,就用mapjoin

6)能不shuffle就不shuffle

39、spark streaming如何对数据接收调优?

1)创建多个DStream和Receive。每一个输入DStream都会在某个Worker的Executor 上启动一个Receiver,该Receiver接收一个数据流。因此可以通过创建多个输入DStream,并且配置它们接收数据源不同的分区数据,达到接收多个数据流的效果,提高数据吞吐量,最后针对多个DStream,使用union进行聚合形成一个DStream。

2)调节block interval。通过参数,spark.streaming.blockInterval,可以设置block

interval,默认是200ms。

3)使用Kryo序列化机制来序列化task,可以减小task的大小,从而减少发送这些task到各个Worker节点上的Executor的时间。

4)提高集群的利用率,调节全局的spark.default.parallelism参数或者在reduceByKey 等操作中,传入第二个参数。

40、spark streaming如何对数据处理并行度调优?

如果在计算的任何stage中使用的并行task的数量没有足够多,那么集群资源是无法被充分利用的。举例来说,对于分布式的reduce操作,比如reduceByKey和reduceByKeyAndWindow,默认的并行task的数量是由spark.default.parallelism参数决定的。你可以在reduceByKey等操作中,传入第二个参数,手动指定该操作的并行度,也可以调节全局的spark.default.parallelism参数。

例如conf.set("spark.default.parallelism","100")

41、spark streaming如何数据序列化调优

数据序列化造成的系统开销可以由序列化格式的优化来减小。在流式计算的场景下,有两种类型的数据需要序列化。

1)输入数据:默认情况下,接收到的输入数据,是存储在Executor的内存中的,使用的持久化级别是StorageLevel.MEMORY_AND_DISK_SER_2。这意味着,数据被序列化为字节从而减小GC开销,并且会复制其它executor进行失败的容错。因此,数据首先会存储在内存中,然后在内存不足时会溢写到磁盘上,从而为流式计算来保存所有需要的数据。这里的序列化有明显的性能开销——Receiver必须反序列化从网络接收到的数据,然后再使用Spark的序列化格式序列化数据。

2)流式计算操作生成的持久化RDD:流式计算操作生成的持久化RDD,可能会持久化到内存中。例如,窗口操作默认就会将数据持久化在内存中,因为这些数据后面可能会在多个窗口中被使用,并被处理多次。然而,不像Spark Core的默认持久化级

别,StorageLevel.MEMORY_ONLY,流式计算操作生成的RDD的默认持久化级别

是StorageLevel.MEMORY_ONLY_SER ,默认就会减小GC开销。

在上述的场景中,使用Kryo序列化类库可以减小CPU和内存的性能开销。使用Kryo 时,一定要考虑注册自定义的类,并且禁用对应引用的

tracking(spark.kryo.referenceTracking)。

在一些特殊的场景中,比如需要为流式应用保持的数据总量并不是很多,也许可以将数

据以非序列化的方式进行持久化,从而减少序列化和反序列化的CPU开销,而且又不会有太昂贵的GC开销。举例来说,如果你数秒的batch interval,并且没有使用window操作,那么你可以考虑通过显式地设置持久化级别,来禁止持久化时对数据进行序列化。这样就可以减少用于序列化和反序列化的CPU性能开销,并且不用承担太多的GC开销。

42、如何对spark streaming的batch interval调优?

如果想让一个运行在集群上的Spark Streaming应用程序可以稳定,它就必须尽可能快地处理接收到的数据。换句话说,batch应该在生成之后,就尽可能快地处理掉。对于一个应用来说,可以通过观察Spark UI上的batch处理时间来定。batch处理时间必须小于batch interval时间。

基于流式计算的本质,batch interval对于,在固定集群资源条件下,应用能保持的数据接收速率,会有巨大的影响。例如,在WordCount例子中,对于一个特定的数据接收速率,应用业务可以保证每2秒打印一次单词计数,而不是每500ms。因此batch interval需要被设置得,让预期的数据接收速率可以在生产环境中保持住。

为你的应用计算正确的batch大小的比较好的方法,是在一个很保守的batch interval,比如5~10s,以很慢的数据接收速率进行测试。要检查应用是否跟得上这个数据速率,可以检查每个batch的处理时间的延迟,如果处理时间与batch interval基本吻合,那么应用就是稳定的。否则,如果batch调度的延迟持续增长,那么就意味应用无法跟得上这个速率,也就是不稳定的。因此你要想有一个稳定的配置,可以尝试提升数据处理的速度,或者增

加batch interval。记住,由于临时性的数据增长导致的暂时的延迟增长,可以合理的,只要延迟情况可以在短时间内恢复即可。

42、spark streaming内存调优

Spark Streaming应用需要的集群内存资源,是由使用的transformation操作类型决定的。举例来说,如果想要使用一个窗口长度为10分钟的window操作,那么集群就必须有足够的内存来保存10分钟内的数据。如果想要使用updateStateByKey来维护许多key的state,那么你的内存资源就必须足够大。反过来说,如果想要做一个简单的map-filter-store 操作,那么需要使用的内存就很少。

通常来说,通过Receiver接收到的数据,会使

用StorageLevel.MEMORY_AND_DISK_SER_2持久化级别来进行存储,因此无法保存在内存中的数据会溢写到磁盘上。而溢写到磁盘上,是会降低应用的性能的。因此,通常是建议为应用提供它需要的足够的内存资源。建议在一个小规模的场景下测试内存的使用量,并进行评估。

内存调优的另外一个方面是垃圾回收。对于流式应用来说,如果要获得低延迟,肯定不想要有因为JVM垃圾回收导致的长时间延迟。之前的GC优化有讲过

1、DStream的持久化:输入数据和某些操作生产的中间RDD,默认持久化时都会序列化为字节。与非序列化的方式相比,这会降低内存和GC开销。使用Kryo序列化机制可以进一步减少内存使用和GC开销。

2、进一步降低内存使用率,可以对数据进行压缩,由https://www.sodocs.net/doc/e47030490.html,press参数控制(默认false)。

如何让数据保存时间更长:默认情况下,所有输入数据和通过DStream transformation 操作生成的持久化RDD,会自动被清理。Spark Streaming会决定何时清理这些数

据,Spark Streaming会根据使用的transformaction来决定何时清理数据。举个例子,如果你使用一个10分钟的窗口,那么程序会保留10分钟的数据,然后自动的清理老数据。当然你可以通过设置streamingContext.remember参数来让数据保留更长的时间。当然你的内存得足够允许这么做的情况下。

43、spark-streaming中每个batch有多少个partition怎么计算的?

batch interval/ block interval

44、spark常用的端口有哪些

8080、4040和18080,18080是历史服务端口

45、receiver和direct方式有什么区别?

receiver把固定间隔的数据放在内存中,使用kafka高级的API,自动维护偏移量,达到固定的时间一起处理每个批次的数据,效率低且容易丢数据,因为数据在内存中,为了容错,还得WALS.

Direct直连方式,相当于直接连接到kafka的分区上,一个RDD的分区对应一个Kafka的分区,使用Kfaka底层的API去读取数据,效率高,但需要自己去维护偏移量(在kakfa 0.10版本之前,但是不管什么版本最好把偏移量存到可靠的外部存储系统中)

46、常用的Transformation和Action算子

Transformation:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,

sortByKey,cogroup,join,mapPartitions,mapPartitionsWithIndex,union,intersection ,distinct,repartition

Action:reduce,collect,count,first,take,saveAsTextFile,saveAsSequenceFile,sa veAsObjectFile,countByKey,foreach

相关主题