博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka集成SparkStreaming
阅读量:6746 次
发布时间:2019-06-25

本文共 13799 字,大约阅读时间需要 45 分钟。

hot3.png

Kafka项目在版本0.8和0.10之间引入了一个新的消费者API,因此有两个独立的相应Spark Streaming包可用。请选择正确的包, 请注意,0.8集成与后来的0.9和0.10代理兼容,但0.10集成与早期的代理不兼容。

注意:从Spark 2.3.0开始,不推荐使用Kafka 0.8支持。

Spark Streaming从Kafka接收数据,转换为spark streaming中的数据结构Dstream。数据接收方式有两种 :1 使用Receiver接收的旧方法:2使用Direct拉取的新方法(在Spark 1.3中引入)。

Receiver方式

     Received是使用Kafka高级Consumer API实现的。与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark Executor的内存中,然后由Spark Streaming启动的job来处理数据。然而默认配置下,这种方式可能会因为底层的失败而丢失数据(请参阅)。如果要启用高可靠机制,确保零数据丢失,要启用Spark Streaming的预写日志机制(Write Ahead Log,(已引入)在Spark 1.2)。该机制会同步地将接收到的Kafka数据保存到分布式文件系统(比如HDFS)上的预写日志中,以便底层节点在发生故障时也可以使用预写日志中的数据进行恢复。

如下图:

接下来,我们将讨论如何在流应用程序中使用此方法。

1 链接 

对于使用Maven项目定义的Scala / Java应用程序时,我们需要添加相应的依赖包:

org.apache.spark
spark-streaming-kafka_2.11
1.6.3

2 编程 

在流应用程序代码中,导入KafkaUtils并创建输入DStream,如下所示。

Scala编程:

import org.apache.spark.streaming.kafka._   val kafkaStream = KafkaUtils.createStream(streamingContext,      [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

Java编程

import org.apache.spark.streaming.kafka.*; JavaPairReceiverInputDStream
kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);

还有几个需要注意的点:

  • Kafka中topic的partition与Spark Streaming中生成的RDD的partition无关,因此,在KafkaUtils.createStream()中,增加某个topic的partition的数量,只会增加单个Receiver消费topic的线程数,也就是读取Kafka中topic partition的线程数量,它不会增加Spark在处理数据时的并行性。
  • 可以使用不同的consumer group和topic创建多个Kafka输入DStream,以使用多个receiver并行接收数据。
  • 如果已使用HDFS等复制文件系统启用了“预读日志”,则接收的数据已在日志中复制。因此,输入流的存储级别的存储级别StorageLevel.MEMORY_AND_DISK_SER(即,使用KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER))。

3 部署

与任何Spark应用程序一样,spark-submit用于启动应用程序。但是,Scala / Java应用程序和Python应用程序的细节略有不同。

对于Scala和Java应用程序,如果您使用SBT或Maven进行项目管理,则将spark-streaming-kafka_2.11其及其依赖项打包到应用程序JAR中。确保spark-core_2.10spark-streaming_2.10标记为providedSpark安装中已存在的依赖项。然后使用spark-submit启动应用程序。

对于缺少SBT / Maven项目管理的Python应用程序,spark-streaming-kafka_2.11可以直接将其依赖项添加到spark-submit使用中--packages。那是,

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3 ...

另外,您也可以下载Maven构件的JAR spark-streaming-kafka-assembly从 ,并将其添加到spark-submit--jars

Direct方式

在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。其形式如下图:

这种方法相较于Receiver方式的优势在于:

  • 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
  • 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
  • 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

请注意,此方法的一个缺点是它不会更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监视工具将不会显示进度。但是,您可以在每个批处理中访问此方法处理的偏移量,并自行更新Zookeeper。

接下来,我们将讨论如何在流应用程序中使用此方法。

1 链接

org.apache.spark
spark-streaming-kafka-0-10_2.11
2.3.1

2 编程

请注意,导入的命名空间包括版本org.apache.spark.streaming.kafka010

Scala编程

复制代码

import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimport org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeval kafkaParams = Map[String, Object](  "bootstrap.servers" -> "node21:9092,node22:9092,node23:9092",  "key.deserializer" -> classOf[StringDeserializer],  "value.deserializer" -> classOf[StringDeserializer],  "group.id" -> "use_a_separate_group_id_for_each_stream",  "auto.offset.reset" -> "latest",  "enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array("topicA", "topicB")val stream = KafkaUtils.createDirectStream[String, String](  streamingContext,  PreferConsistent,  Subscribe[String, String](topics, kafkaParams))stream.map(record => (record.key, record.value))

复制代码

流中的每个项目都是,有关可能的kafkaParams,请参阅。如果Spark批处理持续时间大于默认的Kafka心跳会话超时(30秒),请适当增加heartbeat.interval.ms和session.timeout.ms。对于大于5分钟的批次,这将需要在代理上更改group.max.session.timeout.ms。请注意,该示例将enable.auto.commit设置为false,有关讨论,请参阅。

3 Direct方式案例

复制代码

package com.xyg.spark import kafka.serializer.{StringDecoder, Decoder}import org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.{SparkContext, SparkConf} import scala.reflect.ClassTag /**  * Created by Administrator on 2018/7/28.  */object SparkStreamDemo {  def main(args: Array[String]) {     val conf = new SparkConf()    conf.setAppName("spark_streaming")    conf.setMaster("local[*]")     val sc = new SparkContext(conf)    sc.setCheckpointDir("D:/checkpoints")    sc.setLogLevel("ERROR")     val ssc = new StreamingContext(sc, Seconds(5))     // val topics = Map("spark" -> 2)     val kafkaParams = Map[String, String](      "bootstrap.servers" -> "node21:9092,node22:9092,node23:9092",      "group.id" -> "spark",      "auto.offset.reset" -> "smallest"    )    // 直连方式拉取数据,这种方式不会修改数据的偏移量,需要手动的更新    val lines =  KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("spark")).map(_._2)    // val lines = KafkaUtils.createStream(ssc, "node21:2181,node22:2181,node23:2181", "spark", topics).map(_._2)     val ds1 = lines.flatMap(_.split(" ")).map((_, 1))     val ds2 = ds1.updateStateByKey[Int]((x:Seq[Int], y:Option[Int]) => {      Some(x.sum + y.getOrElse(0))    })     ds2.print()     ssc.start()    ssc.awaitTermination()   }}

复制代码

Spark向kafka中写入数据

上文阐述了Spark如何从Kafka中流式的读取数据,下面我整理向Kafka中写数据。与读数据不同,Spark并没有提供统一的接口用于写入Kafka,所以我们需要使用底层Kafka接口进行包装。

最直接的做法我们可以想到如下这种方式:

复制代码

input.foreachRDD(rdd =>  // 不能在这里创建KafkaProducer  rdd.foreachPartition(partition =>    partition.foreach{      case x:String=>{        val props = new HashMap[String, Object]()        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,          "org.apache.kafka.common.serialization.StringSerializer")        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,          "org.apache.kafka.common.serialization.StringSerializer")        println(x)        val producer = new KafkaProducer[String,String](props)        val message=new ProducerRecord[String, String]("output",null,x)        producer.send(message)      }    }  ))

复制代码

但是这种方式缺点很明显,对于每个partition的每条记录,我们都需要创建KafkaProducer,然后利用producer进行输出操作,注意这里我们并不能将KafkaProducer的新建任务放在foreachPartition外边,因为KafkaProducer是不可序列化的(not serializable)。显然这种做法是不灵活且低效的,因为每条记录都需要建立一次连接。如何解决呢?

1.首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下:

复制代码

import java.util.concurrent.Futureimport org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {  /* This is the key idea that allows us to work around running into     NotSerializableExceptions. */  lazy val producer = createProducer()  def send(topic: String, key: K, value: V): Future[RecordMetadata] =    producer.send(new ProducerRecord[K, V](topic, key, value))  def send(topic: String, value: V): Future[RecordMetadata] =    producer.send(new ProducerRecord[K, V](topic, value))}object KafkaSink {  import scala.collection.JavaConversions._  def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {    val createProducerFunc = () => {      val producer = new KafkaProducer[K, V](config)      sys.addShutdownHook {        // Ensure that, on executor JVM shutdown, the Kafka producer sends        // any buffered messages to Kafka before shutting down.        producer.close()      }      producer    }    new KafkaSink(createProducerFunc)  }  def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)}

复制代码

2.之后我们利用广播变量的形式,将KafkaProducer广播到每一个executor,如下:

复制代码

// 广播KafkaSinkval kafkaProducer: Broadcast[KafkaSink[String, String]] = {  val kafkaProducerConfig = {    val p = new Properties()    p.setProperty("bootstrap.servers", Conf.brokers)    p.setProperty("key.serializer", classOf[StringSerializer].getName)    p.setProperty("value.serializer", classOf[StringSerializer].getName)    p  }  log.warn("kafka producer init done!")  ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))}

复制代码

这样我们就能在每个executor中愉快的将数据输入到kafka当中:

复制代码

//输出到kafkasegmentedStream.foreachRDD(rdd => {  if (!rdd.isEmpty) {    rdd.foreach(record => {      kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)      // do something else    })  }})

复制代码

Spark streaming+Kafka应用

一般Spark Streaming进行流式处理,首先利用上文我们阐述的Direct方式从Kafka拉取batch,之后经过分词、统计等相关处理,回写到DB上(一般为Hbase或者Mysql),由此高效实时的完成每天大量数据的词频统计任务。

Spark streaming+Kafka调优

Spark streaming+Kafka的使用中,当数据量较小,很多时候默认配置和使用便能够满足情况,但是当数据量大的时候,就需要进行一定的调整和优化,而这种调整和优化本身也是不同的场景需要不同的配置。

1 合理的批处理时间(batchDuration)

几乎所有的Spark Streaming调优文档都会提及批处理时间的调整,在StreamingContext初始化的时候,有一个参数便是批处理时间的设定。如果这个值设置的过短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。而且,一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整,如下图:

2 合理的Kafka拉取量(maxRatePerPartition重要)

对于Spark Streaming消费kafka中数据的应用场景,这个配置是非常关键的,配置参数为:spark.streaming.kafka.maxRatePerPartition。这个参数默认是没有上线的,即kafka当中有多少数据它就会直接全部拉出。而根据生产者写入Kafka的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的batchDuration,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量,而这个参数的调整可以参考可视化监控界面中的Input Rate和Processing Time,如下图:

3 缓存反复使用的Dstream(RDD)

Spark中的RDD和SparkStreaming中的Dstream,如果被反复的使用,最好利用cache(),将该数据流缓存起来,防止过度的调度资源造成的网络开销。可以参考观察Scheduling Delay参数,如下图:

4 设置合理的GC

长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:

--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

5 设置合理的CPU资源数

CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况。

6 设置合理的parallelism

partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。

在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。

7 使用高性能的算子

这里参考了美团技术团队的博文,并没有做过具体的性能测试,其建议如下:

  • 使用reduceByKey/aggregateByKey替代groupByKey
  • 使用mapPartitions替代普通map
  • 使用foreachPartitions替代foreach
  • 使用filter之后进行coalesce操作
  • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

8 使用Kryo优化序列化性能

这个优化原则我本身也没有经过测试,但是好多优化文档有提到,这里也记录下来。

在Spark中,主要有三个地方涉及到了序列化:

  • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
  • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
  • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

// 创建SparkConf对象。val conf = new SparkConf().setMaster(...).setAppName(...)// 设置序列化器为KryoSerializer。conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注册要序列化的自定义类型。conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

结果

经过种种调试优化,我们最终要达到的目的是,Spark Streaming能够实时的拉取Kafka当中的数据,并且能够保持稳定,如下图所示:

当然不同的应用场景会有不同的图形,这是本文词频统计优化稳定后的监控图,我们可以看到Processing Time这一柱形图中有一Stable的虚线,而大多数Batch都能够在这一虚线下处理完毕,说明整体Spark Streaming是运行稳定的。

 

spark从kafka获取数据两种方式

1.kafkaUtils.createStream
利用 Kafka 消费者高级 API 在 Spark 的工作节点上创建消费者线程,订阅 Kafka 中的消息,数据会传输到 Spark 工作节点的执行器中,但是默认配置下这种方法在 Spark Job 出错时会导致数据丢失,如果要保证数据可靠性,需要在 Spark Streaming 中开启Write Ahead Logs(WAL),也就是上文提到的 Kafka 用来保证数据可靠性和一致性的数据保存方式。可以选择让 Spark 程序把 WAL 保存在分布式文件系统(比如 HDFS)中, 通过WAL 和checkPiont可以保证数据的安全性 但是效率很低 因为读取数据时需要往文件系统中存储一份,大量的磁盘Io和网络带宽会限制性能,如果数据不需要保证完全安全 可以考虑使用 另外一种

在 源码中可以看出一共有五种创建方法。但是底层最后调用的 是其中kafkaParams主要的参数为zk 参数 groupId 默认会添加链接kafka的超时时间为10000 

可以看出 首先进行判断是否使用wal机制。返回值为KafkainputDstream在KafkaInputDstream中 会根据是否使用wal 创建kafkaReceiver 和ReliableKafkaReceiver  两者的区别就是 后者会storeBlock保证数据的安全 
此种方式是走zookeeper的 会将offset存放在zookeeper中

1.kafkaUtils.createDirectStream

好处:
1、并行 高并发,不需要多个kafka输入流,该方法将会创建和kafka分区一样的rdd个数,而且会从kafka并行读取。
2、高效,这种方式并不需要WAL,如果需要保证数据安全可以通过checkPoint 
3、恰好一次语义(Exactly-once-semantics),传统的读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,存在数据丢失的可能性是zookeeper中和ssc的偏移量不一致。这种通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具
* checkPoint 1.恢复Driver(元数据)2.恢复数据(offset)
在源码 : 返回一个DirectKafkaInputDstream
如果想要实现往zookeeper中手动添加offset      http://www.tuicool.com/articles/vaUzquJ(别人的)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics) 
没有formOffsets和messageHandler    两个构造参数 默认会创建

转载于:https://my.oschina.net/hblt147/blog/2876679

你可能感兴趣的文章
VMware宣布与华云数据签署合作备忘录
查看>>
Human-like learning在对话机器人中的魔性运用
查看>>
《企业级ios应用开发实战》一2.7 在 iPhone 上运行应用程序
查看>>
数据中心布线系统构成及不同规模范例
查看>>
辉伦太阳能向美国Citizens Energy供应28MW光伏组件
查看>>
CVPR 主席专访:如何享受CVPR这场盛会? | CVPR 2017
查看>>
大数据引领地产业转型精准运营
查看>>
传闻称高通骁龙830将首次用上10nm制程
查看>>
《JavaScript和jQuery实战手册(原书第2版)》——2.9节教程:编写使用数组的页面...
查看>>
受制于人所以看病贵!中国95%的专利药和医疗设备被国外公司垄断
查看>>
2017年会成为比WiFi快10倍的“WiGig技术”元年吗?
查看>>
巴西咖啡合作社使用RFID系统,管理咖啡豆库存
查看>>
甲骨文与富士通利用SPARC架构打造M12超级设备
查看>>
新加坡云技术备份服务提供商Dropsuite 获500万美元融资 计划在澳大利亚上市
查看>>
微软打算带着R语言搞仕途?
查看>>
“微信”影响力报告:用数据读懂微信五大业务
查看>>
定位物联网保险 久隆财险盯上深耕装备制造领域
查看>>
研究员发现iOS企业漏洞 苹果表示这是“钓鱼”
查看>>
用大数据解决“痛点”问题 “云上贵州”再发力
查看>>
政企携手升级智慧城市建设
查看>>