Spark一些优化

一、优化1:数据本地化

一、进程本地化级别

1.PROCESS_LOCAL:进程本地化,

代码和数据在同一个进程中,也就是在同一个executor中;计算数据的task由executor执行,数据在executor的BlockManager中;性能最好.

2.NODE_LOCAL:节点本地化

代码和数据在同一个节点中;比如说,数据作为一个HDFS block块,就在节点上,而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不同executor中;数据需要在进程间进行传输

3.NO_PREF

对于task来说,数据从哪里获取都一样,没有好坏之分

4.RACK_LOCAL:机架本地化

数据和task在一个机架的两个节点上;数据需要通过网络在节点之间进行传输

5.ANY

数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差

 

二、数据本地化等待时长

spark.locality.wait,默认是3s

降级!!!!!!! 次一点的还不行,再等待3秒。
再降级!!!等待。最后不行,就用最坏的!!!!

 

三、何时需要调节

怎么调?

看运行日志!
看WebUI

我们什么时候要调节这个参数?怎么调节!

观察日志,spark作业的运行日志,推荐大家在测试的时候,先用client模式,在本地就直接可以看到比较全的日志。
日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL

观察大部分task的数据本地化级别
如果大多都是PROCESS_LOCAL,那就不用调节了

如果是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长。

调节完,应该是要反复调节,每次调节完以后,再来运行,观察日志
看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短。

注意

别本末倒置,本地化级别倒是提升了,
但是因为大量的等待时长,spark作业的运行时间反而增加了,那就还是不要调节了。

四、怎么调节?

spark.locality.wait,默认是3s;6s,10s

默认情况下,下面3个的等待时长,都是跟上面那个是一样的,都是3s

spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack

new SparkConf()
.set(“spark.locality.wait”, “10”)

 

四、原理浅析

Spark在Driver上,对Application的每一个stage的task进行分配之前,都会计算出每个task要计算的是哪个分片数据,RDD的某个partion;
Spark的task分配算法,优先。会希望每个task正好分配到它要计算的数据所在的节点,这样的话,就不用在网络上传输数据。

有时,事与愿违,task没有机会正好分配到数据所在的节点。

为什么呢? 可能那个节点的计算资源和计算能力都满了。
通常会等待一段时间,默认3s。(不是绝对的,还有很多种情况,)

默认情况下是3s钟(不是绝对的,还有很多种情况,对不同的本地化级别,都会去等待),到最后,实在是等待不了了,就会选择一个比较差的本地化级别,

比如说,将task分配到靠它要计算的数据所在节点,比较近的一个节点,然后进行计算。

但是对于第二种情况,通常来说,肯定是要发生数据传输,task会通过其所在节点的BlockManager来获取数据,

BlockManager发现自己本地没有数据,会通过一个getRemote()方法,

通过TransferService(网络数据传输组件)从数据所在节点的BlockManager中,获取数据,通过网络传输回task所在节点。

对于我们来说,当然不希望是类似于第二种情况的了。

最好的,当然是task和数据在一个节点上,直接从本地executor的BlockManager中获取数据,纯内存,或者带一点磁盘IO;

如果要通过网络传输数据的话,那么实在是,性能肯定会下降的,大量网络传输,以及磁盘IO,都是性能的杀手。

 

 

二、优化2:提高 Job 执行的并行度

1. 明确 Spark中Job 与 Streaming中 Job 的区别

1.1 Spark Core

一个 RDD DAG Graph 可以生成一个或多个 Job(Action操作)

一个Job可以认为就是会最终输出一个结果RDD的一条由RDD组织而成的计算

Job在spark里应用里是一个被调度的单位

1.2 Streaming

一个 batch 的数据对应一个 DStreamGraph

而一个 DStreamGraph 包含一或多个关于 DStream 的输出操作

每一个输出对应于一个Job,一个 DStreamGraph 对应一个JobSet,里面包含一个或多个Job

 

2. Streaming Job的并行度

Job的并行度由两个配置决定:

spark.scheduler.mode(FIFO/FAIR)

spark.streaming.concurrentJobs

一个 Batch 可能会有多个 Action 执行,比如注册了多个 Kafka 数据流,每个Action都会产生一个Job;所以一个 Batch 有可能是一批 Job,也就是 JobSet 的概念

这些 Job 由 jobExecutor 依次提交执行,而 JobExecutor 是一个默认池子大小为1的线程池,所以只能执行完一个Job再执行另外一个Job

这里说的池子,大小就是由spark.streaming.concurrentJobs 控制的

concurrentJobs 决定了向 Spark Core 提交Job的并行度

提交一个Job,必须等这个执行完了,才会提交第二个

假设我们把它设置为2,则会并发的把 Job 提交给 Spark Core

Spark 有自己的机制决定如何运行这两个Job,这个机制其实就是FIFO或者FAIR(决定了资源的分配规则)

默认是 FIFO,也就是先进先出,把 concurrentJobs 设置为2,但是如果底层是FIFO,那么会优先执行先提交的Job

虽然如此,如果资源够两个job运行,还是会并行运行两个Job

 

3. spark.streaming.concurrentJobs 可让不同Batch的job同时在运行

Streaming 不仅仅能同时运行 同一个batch 的job,甚至还能同时运行不同 Batch的 Job

程序里默认:设置为6

if (userDefinedConcurrency == 1) {
   conf.set("spark.streaming.concurrentJobs", "6")

  } 

 

三、反压机制

Spark Streaming 通过 receivers (或者是 Direct 方式) 以生产者生产数据的速率接收数据。

当 batch processing time > batch interval 的时候,也就是每个批次数据处理的时间要比 Spark Streaming 批处理间隔时间长;越来越多的数据被接收,但是数据的处理速度没有跟上,导致系统开始出现数据堆积,可能进一步导致 Executor 端出现 OOM 问题而出现失败的情况。

为了解决这个问题:

对于 Receiver-based 数据接收器,我们可以通过配置 spark.streaming.receiver.maxRate 参数来限制每个 receiver 每秒最大可以接收的记录的数据;

对于 Direct Approach 的数据接收,我们可以通过配置 spark.streaming.kafka.maxRatePerPartition 参数来限制每次作业中每个 Kafka 分区最多读取的记录条数

这种方法虽然可以通过限制接收速率,来适配当前的处理能力,但这种方式存在以下几个问题:

  • 我们需要事先估计好集群的处理速度以及消息数据的产生速度;
  • 这两种方式需要人工参与,修改完相关参数之后,我们需要手动重启 Spark Streaming 应用程序;
  • 如果当前集群的处理能力高于我们配置的 maxRate,而且 producer 产生的数据高于 maxRate,这会导致集群资源利用率低下,而且也会导致数据不能够及时处理。

Spark一些优化

 

反压机制原理

Spark一些优化

  • 为了实现自动调节数据的传输速率,在原有的架构上新增了一个名为 RateController 的组件,这个组件继承自 StreamingListener,其监听所有作业的 onBatchCompleted 事件,并且基于 processingDelay 、schedulingDelay 、当前 Batch 处理的记录条数以及处理完成事件来估算出一个速率;这个速率主要用于更新流每秒能够处理的最大记录的条数。速率估算器(RateEstimator)可以又多种实现,不过目前的 Spark 2.2 只实现了基于 PID 的速率估算器。
  • InputDStreams 内部的 RateController 里面会存下计算好的最大速率,这个速率会在处理完 onBatchCompleted 事件之后将计算好的速率推送到 ReceiverSupervisorImpl,这样接收器就知道下一步应该接收多少数据了。
  • 如果用户配置了 spark.streaming.receiver.maxRate 或 spark.streaming.kafka.maxRatePerPartition,那么最后到底接收多少数据取决于三者的最小值。也就是说每个接收器或者每个 Kafka 分区每秒处理的数据不会超过 spark.streaming.receiver.maxRate 或 spark.streaming.kafka.maxRatePerPartition 的值。

 

涉及参数

  • spark.streaming.backpressure.enabled 设置为 true 开启反压
  • spark.streaming.kafka.maxRatePerPartition 每个partition每秒最多消费条数
  • spark.streaming.backpressure.rateEstimator 速率估算器类,默认值为 pid ,目前 Spark 只支持这个。

以下参数仅针对PID速率估算器设置

  • spark.streaming.backpressure.pid.proportional:用于响应错误的权重(最后批次和当前批次之间的更改)。默认值为1,只能设置成非负值。
  • spark.streaming.backpressure.pid.integral:错误积累的响应权重,具有抑制作用(有效阻尼)。默认值为 0.2 ,只能设置成非负值。
  • spark.streaming.backpressure.pid.derived:对错误趋势的响应权重。 这可能会引起 batch size 的波动,可以帮助快速增加/减少容量。默认值为0,只能设置成非负值。
  • spark.streaming.backpressure.pid.minRate:可以估算的最低费率是多少。默认值为 100,只能设置成非负值。

 

实现过程

Direc消费方式的反压逻辑

反压主要分为三个流程

1. 启动服务时注册rateCotroller

2. 监听到批次结束事件后采样计算新的消费速率

3. 提交job时利用消费速率计算每个分区消费的数据条数

以下为详细流程及各流程的说明:

 

1. 启动服务时注册rateCotroller

Spark一些优化

在程序运行到StreamingContext的start方法时会调用JobScheduler的start方法,在这里会根据消费者的不同生成不同的RateController,在kafka中生成的是DirectKafkaRateController实例。接下来会把生成的RateController注册到StreamingListenerBus中。

2. 监听到批次结束事件后采样计算新的消费速率

Spark一些优化

(1) StreamingListener的doPostEvent监听到批次结束事件后会调用RateController的onBatchCompleted方法,在此方法中会获取processingEnd(处理结束时间)、workDelay(处理耗时)、waitDelay(调度延迟)、elems(消息条数)着四个参数用于计算新的消费速率

(2) computedAndPublish方法会获取新的消费速率。

(3) computedAndPublish方法会调用RateEstimator接口的compute方法,现在spark支持的唯一实现类是PIDRateEstimator。compure方法返回的消费速率值会随着不断采样速率值趋向稳定。

(4) computedAndPublish方法调用compute方法获取到新的速率后把新的消费速率赋值到rateLimit属性中。

3. 提交job时利用消费速率计算每个分区消费的数据条数

Spark一些优化

(1) 在提交Job时会调用DirectKafkainputDStream类的compute方法获取这一批次处理KafkaRDD。

(2) compute方法会调用clamp方法获取这一批次每个partitions要消费的截止offset(取kafka最新的offset和反压计算后的offset的最小值)。

(3) clamp方法会调用maxMessagesPerPartition方法通过消费速率计算出每个partition消费的截止offset。

(4) maxMessagesPerPartition方法调用getLatestRate,获取消费速率。

(5) 通过获取的消费速率计算每个分区消费的记录数。计算方式如下:

  • 所有分区滞后offset总和为totalLag
  • 某一分区滞后offset 为lagPerPartition
  • 消费速率为 lastestRage
  • 每个分区消费条数为 Match.round(为lagPerPartition/totalLag*lastestRage)

 

spark.streaming.kafka.maxRatePerPartition
含义: 从每个kafka partition中读取数据的最大比率,条数

spark.streaming.kafka.maxRatePerPartition这个参数是控制吞吐量的,一般和spark.streaming.backpressure.enabled=true一起使用。那么应该怎么算这个值呢。

如例要10分钟的吞吐量控制在5000,0000,kafka分区是10个。

spark.streaming.kafka.maxRatePerPartition=8400这个值是怎么算的呢。

如下是公式:

spark.streaming.kafka.maxRatePerPartition(条数)的值 * kafka分区数 * (10 *60)(每秒时间) 

 

 

尽管有两个相关的配置,但是通常用户不需要对它们进行调整,因为默认值适用于大多数工作负载:

  1. spark.memory.fraction 代表整体JVM堆内存中M的百分比(默认0.6)。剩余的空间(40%)是为用户数据结构、Spark内部metadata预留的,并在稀疏使用和异常大记录的情况下避免OOM错误。
  2. spark.memory.storageFraction 代表M中R的百分比(默认0.5)。R是M中提供给缓存数据块避免受到执行驱逐的存储空间。

 

 

 

 

上一篇:Apache Flink:测试使用reduce增量聚合和windowAll操作


下一篇:Unity Render Streaming,3D模型流式云渲染的解决方案