编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

画像笔记13- 实时标签-流式计算标签开发-step1

wxchong 2024-08-01 03:01:25 开源技术 13 ℃ 0 评论

这是笔记,敲重点,我不是原创。

流式计算标签的开发

ETL任务,一般为T+1日的数据,本节内容介绍实时标签数据的开发:

做实时订单分析,或者给首次登陆APP的新人用户弹窗推送、发放红包、实时分析用户所处场景进行推送中有广泛的应用。这里使用spark streaming 开发相关的实时分析数据:

4.4.1流式标签建模框架

Spark Streaming是Spark Core API 的扩展,支持实时数据流的处理,并且有可扩展、高吞吐量、容错的特点。数据可以从Kafka、Flume 等多个来源获取,可以使用map\reduce\window等多个高级函数对业务逻辑进行处理。最后,处理后的数据被推送到文件系统、数据库等。

在内部Spark Streaming接收实时数据流并将数据分成多个batch批次,然后由Spark 引擎进行处理,批量生成结果流。Spark Streaming 提供了一个高层抽象,称为Discretized Stream 或 Dstream,它表示连续的数据流。Dstream 可以通过Kafka、Flume等来源的数据流创建,也可以通过在其它Dstream上应用高级操作来创建:



kinesis是AWS 上的数据源:

典型的Kinesis Data Streams 应用程序从数据流作为数据记录。这些应用程序可使用Kinesis 客户端库,并且可在Amazon EC2 实例上运行。

Kafka 简介

Kafka的核心功能是作为分布式消息中间件。Kafka集群由多个Broker server组成,其中,消息的发送者称为producer;消息的消费者称为consumer;Broker是消息处理的节点,多个Broker组成Kafka集群;Topic是数据主体,用来区分不同的业务系统,消费者通过订阅不同的Topic来消费不同主题的数据,每个Topic又被分为多个Partition,Partition是topic分组,每个Partition都是一个有序队列;offset 用于定位消费者在每个Partition中消费的位置。

kafka对外使用topic 概念,生产者对topic 写入消息,消费从topic中读取消息。一个Topic 由多个partition组成。生产者向Brokers指定的Topic中写入消息,消费者从Broker里面拉去指定的Topic消息,然后进行业务处理。下图表示向一个Topic中写入数据,写入的数据被追加到Partition的尾部。当Consumer消费消息时,每个Partition下的offset会按从小到大的顺序向前驱动。


在consumer消费消息时,还需要制定这个Consumer属于哪个Consumer Group(如下图所示),每个consumer Group消费一个 Topic下的所有Partition数据。每个Consumer 实例都属于一个Consumer Group,每一条消息只会被同一个Consumer Group里的一个Consumer 实例消费,不同的consumer group 可以同时消费同一条数据。开发时需要在对应的代码中指定Groupid。


Spark Streaming集成Kafka

Spark Streaming 可以通过Receiver 和 Direct 两种模式来集成Kafka。

在Receiver 模式下,Spark Streaming 作为Consumer 拉取Kafka中的数据,将获取的数据存储在Excutor 内存中。但可能会因为数据量大而造成内存溢出,所以启用预写日志机制(Write Ahead Log)将溢出部分写入到HDFS上。在接收数据中,当一个Receiver 不能及时接收所有的数据时,再开启其他Receiver接收,它们必须属于同一个Consumer Group,这样可以提高Streaming程序的吞吐量。整体来说,Receiver模式效率较低,容易丢失数据,在生产环境中使用较少。


在Direct 模式下,Spark Streaming 直接读取Kafka的topic中的所有Partition,获取Offset 信息。Spark Streaming 中有一个Inputstream,这个Dstream的每一个分区对应着Kafka中需要消费的Topic的每一个分区,并且从Kafka中读取数据。在Direct 模式下,是Spark Streaming 自己跟踪消费的Offset,消除了与Zookeeper不一致的情况,处理和输出过程符合Exactly-once模式。


对比来看,Receiver 模式是通过Zookeeper来连接kafka队列的,Direct模式则直接连接Kafka节点来获取消息。Receiver模式消费Topic中的offset是保存在Zookeeper中,Direct 模式消除了与Zookeeper不一致的情况,基于Direct模式可以使Spark Streaming应用完全达到Exacly-once语义情况。

Spark Streaming 对Kafka的集成有两个版本,一个是0.8版本,另一个是0.10以上的版本,0.10以后只保留了Direct 模式。

标签开发及工程化

实时类标签的处理流程主要包括4个部分:

  • 读取数据源,这里讲解消费Kafka中的数据;
  • 解析数据,即解析消费的kafka数据;
  • 将解析后的数据存储到指定位置(如MySQL\HDFS\HBASE等);
  • 存储消费的Offset,Direct模式下需要保存消费到的位置。
  1. 主函数逻辑:


sc = new SparkContext(sparkConf) //

ssc = new StreamingContext(sc,Seconds(5)) //时间间隔5秒

创建一个streaming context 对象,从现有sparkconf对象中创建。

这里设置batch时间间隔为5秒。


new SparkUtils(ssc).getDirectStream(ParamsUtils.kafka.KAFKA_TOPIC)

传入Kafka的Topic 从Kafka中拉取数据。

message.foreachRDD () 函数表示从message里面读取每一条rdd,然后rdd.分区偏移量.offsetRanges 读取偏移量信息。(OffsetRange包含的信息有:topic名字、分区ID、开始偏移、结束偏移)

---

在上面的程序中,将从kafka中读取的数据赋给Message,然后记录Offset的偏移信息。这里打印出Offset偏移信息包括Topic主题、分区id、开始偏移量和结束偏移量。(如下图所示)下图打印出当前消费的topic是“countly_imp”,partitionid为0-11,第三列记录的是本次消费前的偏移量,第四列记录消费后的偏移量。


将从Kafka中获取的数据进行业务处理,解析后存入指定的库表中。示例代码如下:


2. 从kafka中读取数据源:

在上面的主函数中定义了从Kafka中读取数据的方法getDirectStream:

val message = new SparkUtils(ssc).getDirectStream(ParamsUtils.kafka.KAFKA_TOPIC)

接下来通过代码介绍getDirectStream方法的实现方式。

消费者消费Kafka的Offset数据记录在Zookeeper中,在开启Streaming 程序消费Kafka 数据时,先从Zookeeper中查找最近一次消费的Offset位置,如果有记录当前Topicid+Groupid消费者Offset的位置,则从记录器开始继续消费Offset.如果没有记录,则从当前Offset最大处开始消费。代码逻辑实现如下:


--- 看到这-----------待续------

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表