学习目标: .了解消息队列,能够说出消息队列的主要应用场景。 .熟悉Kafka的概念,能够叙述Kafka的优点。 .熟悉Kafka的基本架构,能够说出Kafka基本架构的内容。 .掌握Kafka的工作流程,能够叙述生产者生产消息过程和消费者消费消息过程。 .掌握Kafka集群的搭建方法,能够独立完成部署Kafka集群。 .掌握Kafka的基本操作,能够使用Shel 命令和PythonAPI操作Kafka。 .掌握实时单词计数,能够从一个Topic中消费数据实现实时单词计数,并将结果发 送到另一个Topic中。 Kafka是一个基于ZooKeper系统的分布式发布订阅消息系统适用于实时计算系统。 通常情况下,使用Kafka能够构建系统或应用程序之间的数据管道,用来转换或响应实时数 据,使数据能够及时地进行业务计算,得出相应结果。本章针对消息队列简介、Kafka简介、 Kafka工作原理、Kafka集群的搭建以及Kafka的基本操作进行详细讲解。 5.消息队列简介 1 消息队列(MesageQueue,MQ)是分布式系统中的一个关键组件,用于存储消息,它的 作用是将待传输的数据存放在队列中,以便生产者和消费者可以并行地处理数据,而无须等 待对方的响应。通过消息队列,生产者可以将消息发送到队列,而消费者可以从队列中获取 消息进行处理。这种解耦的设计模式使得系统的可伸缩性和可靠性得到提高,同时也减少 了系统间的依赖性。 消息队列既然能够用来存储消息,那么消息队列的主要应用场景有哪些呢? 接下来,针 对消息队列的主要应用场景进行介绍。 (1)异步处理。 异步处理是指应用程序允许用户将一个消息放入队列中,但是应用程序并不立即处理 用户提交的消息,而是在用户需要用到该消息时应用程序再去处理。例如,用户在注册电商 网站时,在没有使用异步处理的场景下,注册流程是电商网站把用户提交的注册信息保存到 数据库中,同时额外发送注册的邮件通知以及短信注册码给用户。由于发送邮件通知和短 信注册码需要连接其对应的服务器,如果发送完邮件通知再发送短信注册码,用户就会等待 110 Spark大数据分析与应用(Python版) 较长的时间。针对上述情况,使用消息队列将邮件通知以及短信注册码保存起来,电商网站 只需要将用户的注册信息保存到数据库中便可完成注册,这样便能实现快速响应用户注册 的操作。下面,通过图5-1来了解使用异步处理前后的区别。 图5- 1 使用异步处理前后的区别 从图5-1可以看出,使用异步处理前,用户需要经历用户注册→电商网站→保存用户信 息到数据库→发送注册邮件通知→发送短信注册码5个步骤,用户注册到发送短信注册码 总共需要耗时450ms;而使用异步处理后,用户只需经历用户注册→电商网站→保存用户信 息到数据库→消息队列4个步骤,用户注册到将注册信息保存到消息队列总共需要消耗 60ms,通过对比可以发现,异步处理的注册方式要比传统注册方式响应得快。 (2)系统解耦。 系统解耦是指用户提交的请求需要与应用程序中另一个模块建立联系,两个模块之间 不会因为各自功能的问题而影响另一个模块的使用。例如,用户在电商网站购买物品并提 交订单时,订单模块会调用库存模块确认商品是否还有库存,在没有系统解耦的场景下,如 果库存模块功能出现问题,会导致订单模块下单失败,而且当库存模块的对外接口发生变 化,订单模块也依旧无法正常工作。当使用了系统解耦,订单模块便不会直接调用库存模 块,而是将订单信息保存到消息队列中,库存模块再从消息队列中获取订单信息,从而实现 订单模块与库存模块之间互不影响。下面,通过图5-2来了解使用系统解耦前后的区别。 从图5-2可以看出,使用系统解耦前,订单模块需要直接调用库存模块,而使用系统解 耦后,订单模块先将订单信息保存到消息队列中,然后库存模块在消息队列中获取订单信 息,这样订单模块与库存模块之间不会产生直接的影响。 (3)流量削峰。 流量削峰是在物品秒杀或促销等场景中,避免用户访问次数过多导致应用程序崩溃的 一种策略。它通过控制参与活动的人数,以缓解短时间内访问次数过多对应用程序造成的 压力。例如,电商网站推出物品秒杀活动,在未使用流量削峰的场景下,电商网站推出物品 秒杀活动时,大量用户会访问物品秒杀活动界面,造成该界面的访问次数过多,导致电商网 站负载过重而容易崩溃。当使用了流量削峰,物品秒杀界面在接收用户的请求后,会将用户 的请求保存到消息队列中,如果请求的数据量超过了设定的消息队列的容量,就会告知当前 第5章Kafka分布式发布订阅消息系统111 图5- 2 使用系统解耦前后的区别 活动参与人数过多,这样可以避免整个电商网站崩溃的现象。下面,通过图5-3来了解使用 流量削峰前后的区别。 图5- 3 使用流量削峰前后的区别 从图5-3可以看出,使用流量削峰前,用户直接请求秒杀界面,短时间内该界面被用户 请求的次数过多,而使用流量削峰后,用户请求被保存到消息队列中,秒杀界面在消息队列 中获取用户请求,这样能避免秒杀界面请求次数过多导致整个电商网站崩溃的现象。 了解了消息队列的应用场景,那么消息队列中的消息是如何进行传递的呢? 消息传递 一共有两种模式,分别是点对点消息传递和发布/订阅消息传递模式,关于这两种消息传递 模式的介绍如下。 (1)点对点消息传递模式 。 在点对点(PointtoPoint,P2P)消息传递模式下,消息生产者将消息发送到特定队列 , 112 Spark大数据分析与应用(Python版) 消息消费者从队列中拉取或轮询以获取消息。 点对点消息传递模式结构如图5-4所示。 图5- 4 点对点消息传递模式结构 从图5-4可以看出,生产者将消息发送到消息队列中,此时将有一个或者多个消费者会 消费消息队列中的消息,但是消息队列中的每条消息只能被消费一次,并且消费后的消息会 从消息队列中删除。 (2)发布/订阅消息传递模式。 在发布/订阅(Publish/Subscribe)消息传递模式下,消息生产者将消息发送到消息队列 中,所有消费者会即时收到并消费消息队列中的消息。 发布/订阅消息传递模式结构如图5-5所示。 图5- 5 发布/订阅消息传递模式结构 从图5-5可以看出,在发布/订阅消息传递模式结构中,生产者将消息发送到消息队 列中,此时将有多个不同的消费者消费消息队列中的消息。与点对点模式不同的是,发 布/订阅消息传递模式中消息队列的每条消息可以被多次消费,并且消费完的消息不会 立即删除。 【小提示】 点对点消息传递模式和发布/订阅消息传递模式都会采用基于拉取或推送方式传递消 息。基于拉取方式传递消息时消费者会定期查询消息队列是否有新消息,基于推送方式传 递消息时消息队列会将消息推送给已订阅该消息队列的消费者。不过在发布/订阅消息传 递模式中,常用的消息传递方式为拉取方式。 5.2 Kafka简介 Kafka是一个基于ZooKeper系统的分布式发布订阅消息系统,它使用Scala和Java 语言编写,该系统的设计初衷是为实时数据提供一个统一、高吞吐﹑低延迟的消息传递平 台。在0.10版本之前,Kafka只是一个消息系统,主要用来解决异步处理、系统解耦等问 题,在0.Kafka推出了流处理的功能, 10版本之后,使其逐渐成为了一个流式数据平台。 Kafka作为分布式发布订阅消息系统,可以处理大量的数据,并能够将消息从一个端点 传递到另外一个端点。Kafka在大数据领域中的应用非常普遍,它能够在离线和实时两种 大数据计算场景中处理数据,这得益于Kafka的优点,其优点具体如下。 第5章Kafka分布式发布订阅消息系统113 (1)高吞吐,低延迟。Kafka可以每秒处理数量庞大的消息,并且具有较低的延迟。 (2)可扩展性。Kafka是一个分布式系统,用户可以根据实际应用场景自由、动态地扩 展Kafka服务器。 (3)持久性。Kafka可以将消息存储在磁盘上,以确保数据的持久性。 (4)容错性。Kafka会将数据备份到多台服务器中,即使Kafka集群中的某台服务器 宕机,也不会影响整个系统的功能。 (5)支持多种语言。Kafka支持Java、Scala、PHP 、Python等多种语言,这使得开发人 员在不同语言环境下使用Kafka更加便捷。 在实际的大数据计算场景中,若需要对接外部数据源时,就可以使用Kafka,如日志收 集系统和消息系统,Kafka读取日志系统中的数据,每得到一条数据,就可以及时地处理一 条数据,这就是常见的流式计算框架应用场景之一。在流式计算框架中,Kafka一般用来缓 存数据,它与Apache旗下的Spark、Storm等框架紧密集成,这些框架可以接收Kafka中的 缓存数据并进行计算,实时得出相应的计算结果。 5.3 Kafka工作原理 1 Kfa的基本架构 5.3.ak 学习Kafka的基本架构对于有效地使用和管理Kafka是至关重要的。Kafka的基本架 构由Producer、Broker、Consumer和ZooKeper构成,它们之间共同协作,构建了高效、可 靠的消息处理系统。接下来,通过图5-6学习Kafka的基本架构。 图5- 6 Kafka的基本架构 1.Producer Producer作为Kafka中的生产者,主要负责将消息发送到Broker(消息代理)内部的 Topic(主题)中,在发送消息时,消息的内容主要包括键和值两部分,其中键默认为nul,值 是指发送消息的内容。除此之外,用户还可以根据需求添加属性信息。为消息指定键可以 将相同键的消息发送到相同的分区,从而保证相关消息的顺序性。 114 Spark大数据分析与应用(Python版) 2.Broker Broker作为Kafka中的消息代理,是存储和管理消息的载体,每一个Broker都可以看 作Kafka服务。存储在Broker中的消息基于Topic进行分类和组织。在Kafka中,Topic 是消息的逻辑概念,类似于一个消息类别或话题,每个Topic可以有一个或多个Partition (分区)。例如,具有3个Partition的Topic,如图5-7所示。 图5- 7 具有3个Partition的Topic 在图5-7中,Partition的标识从0开始,Producer生产的消息会被分配到不同的 Partition中,每条消息都会被分配一个从0开始具有递增顺序的ofset(偏移量),不同 Partition之间的ofset相互独立,互不影响。 Topic中的每个Partition可以存在多个副本,这些副本分布在不同的Broker上,实现 消息的备份和容错。在Kafka中,Partition分为Leader和Folower两个角色。Leader负 责接收和发送消息,而Folower作为Leader的副本则负责复制Leader的消息。这种设计 保证了在某个Broker失效时,系统依然能够确保消息的可用性和一致性。 此外,Broker还负责响应Consumer(消费者)消费消息的请求,Broker根据Consumer 提供的ofset检索Topic中相应Partition的消息,并将这些消息传递给Consumer。 3.Consumer Consumer作为Kafka中的消费者,负责消费Topic中的消息,一旦Consumer成功消 费了消息,Consumer记录自身已消费消息的ofset,并且根据配置策略手动或定期自动地 将已消费消息的ofset保存在Broker内部名为__consumer_ofsets的Topic,这确保了 Consumer即使重新消费或崩溃时,Broker能够准确地确定消息的ofset,实现从正确的位 置继续消费消息。 在Kafka中,多个Consumer可以组成特定的消费者组,消费者组之间相互独立,互不 影响,这种设计可以让多个Consumer协同处理同一个Topic中的消息,实现负载均衡。 4.ZKeeper ZooKep(o) (o) er在Kafka中负责管理和协调Broker,并且ZooKeper存储了Kafka的元数 据信息,包括Topic名称、Partition副本等。 多学一招:Kafka分区策略 生产者将消息发送到Broker内部的Topic时,如果需要确保每个Topic中的Partition负 载均衡,可以在生产者发送消息时为生产者指定相应的分区策略。Kafka中常见的分区策略 有DefaultPartitioner、RoundRobinPartitioner、StickyPartitioner和UniformStickyPartitioner,关于 这4种分区策略的介绍如下。 第5章Kafka分布式发布订阅消息系统115 1.DefaultPartitioner 该分区策略是Kafka默认的分区策略,针对消息保存到Partition时会存在3种情况, 具体如下。 (1)生产者发送消息的时候指定了Partition,则消息将保存到指定的Partition中。 (2)生产者发送消息的时候没有指定Partition,但消息的键不为空,则基于键的哈希值 来选择一个Partition进行保存。 (3)生产者发送消息的时候不但没有指定Partition,而且消息的键为空,则通过轮询的 方式将消息均匀地保存到所有Partition。这种情况下,DefaultPartitioner分区策略会基于 Partition的数量和可用性以确保消息的平均保存。 2.RoundRobinPartitioner 该分区策略是一种轮询分区策略,在保存消息时并不考虑消息中键的影响,而是通过轮询 的方式将每条消息依次发送到每个Partition,确保消息在所有Partition间按照严格的轮询顺 序分布,适用于希望均匀地保存消息以实现负载平衡,但不考虑消息的相关性或顺序性。 3.StickyPartitioner 该分区策略是一种黏性分区策略,在保存消息时需要考虑消息中键的影响,会将具有相 同键的消息保存到同一个Partition中,以保持消息的顺序性和一致性,适用于需要按照消 息的键保存到Partition后依然保持顺序,然而这种情况下会出现其中一个Partition中具有 相同键的消息比较多,而另一个Partition中具有相同键的消息比较少。 4.UniformStickyPartitioner 该分区策略是一种统一黏性分区策略,针对消息保存到Partition时会存在两种情况, 具体如下。 (1)生产者发送消息的时候指定了Partition,则消息将保存到指定的Partition中。 (2)生产者发送消息的时候没有指定Partition,但消息的键不为空,会将具有相同键的 消息保存到不同的Partition中,实现Partition负载均衡。 5..aka工作流程 32 Kf Kafka的工作流程是Kafka实现消息发送和消费的核心过程,了解Kafka的工作流程 对于理解Kafka的基本架构和性能优化有着至关重要的作用。Kafka的工作流程可以分为 生产者生产消息过程和消费者消费消息过程。 接下来,针对生产者生产消息过程和消费者消费消息过程进行详细讲解。 1. 生产者生产消息过程 Kafka生产者负责生成并发送消息到Kafka集群中的指定主题中。下面通过图5-8来 介绍生产者生产消息过程。 图5- 8 生产者生产消息过程 116 Spark大数据分析与应用(Python版) 从图5-8可以看出,生产者生产消息过程可以分为5个步骤,具体如下。 (1)Producer通过访问Broker间接获取ZooKeper中存储的元数据,包括Topic分区 分布、Leader副本位置等。 (2)Producer将消息发送给角色为Leader的Partition,与此同时,角色为Leader的 Partition会将消息写入自身的日志文件中。 (3)角色为Folower的Partition从角色为Leader的Partition中获取消息,将消息写 入自身的日志文件中,完成复制操作。 (4)角色为Folower的Partition将消息写入自身的日志文件后,会向角色为Leader 的Partition发送成功复制消息的信号。 (5)角色为Leader的Partition收到角色为Folower的Partition发送的复制消息后, 同样向Producer发送消息写入成功的信号,此时消息生产完成。 2.消费者消费消息过程 消息由Producer发送到指定Topic中角色为Leader的Partition中后,Consumer会采 用拉取模型的方式消费消息。在拉取模型下,Consumer主动向Broker发送消费消息的请 求,请求的内容包括消息的Partition、ofset等,Broker根据请求将消息返回给Consumer, Consumer消费消息后会将ofset提交给Broker,以便下次能够正确消费消息。该模型的 优势在于Consumer会记录自己的消费状态,后续Consumer可以对已消费的消息再次消 费,避免出现网络延迟或者宕机等原因造成消息消费延迟或丢失。 下面通过图5-9介绍消费者消费消息过程 。 从图5-9可以看出,消费者消费消息过程可以 分 为4个步骤,具体如下 。 (1)Consumer通过访问Broker间接获 取 ZooKeper中存储的元数据,包括Topic分区分布 、 Leader副本位置等 。 (2)Consumer根据消息的ofset,向Topic中角图5- 9 消费者消费消息过程 色为Leader的Partition发送请求消费消息。 (3)Topic中角色为Leader的Partition根据ofset将对应的消息返回给Consumer进 行消费。 (4)Consumer消费消息后,记录自己的消费状态,将已消费消息的ofset保存在 Broker内部特殊的Topic中,以便下次消费消息时能够从正确的位置开始消费。 通过本节的学习,了解到Kafka中的工作流程是由各个组成部分相互协调实现的。在 个人学习成长过程中,也应铭记协调的重要性。协调不仅能够促进团队成员之间的沟通和 协商,而且能够协调冲突和不同意见,以实现共同的学习和工作目标。 5.搭建Kaka集群 4f 学习完Kafka理论知识后,接下来讲解如何在虚拟机Hadoop1、Hadoop2和Hadoop3 中搭建Kafka集群,具体步骤如下。 第5章 Kafka分布式发布订阅消息系统1 17 1.下载Kafka安装包 本书使用的Kafka版本为3.2.1。通过Kafka官网下载Kafka安装包kafka_2.12-3.2.1.tgz。 2.上传Kafka安装包 在虚拟机Hadoop1的/export/software目录执行rz命令,将准备好的Kafka安装包 kafka_2.12-3.2.1.tgz上传到虚拟机的/export/software目录。 3.安装Kafka 使用解压操作安装Kafka,将Kafka安装到存放安装程序的目录/export/servers,在/ export/software目录执行如下命令。 $ tar -zxvf kafka_2.12-3.2.1.tgz -C /export/servers/ 4.配置Kafka环境变量 分别在虚拟机Hadoop1、Hadoop2和Hadoop3执行vi/etc/profile命令编辑系统环境 变量文件profile,在该文件的尾部添加如下内容。 export KAFKA_HOME=/export/servers/kafka_2.12-3.2.1 export PATH=:$PATH:$KAFKA_HOME/bin 成功配置Kafka环境变量后,保存并退出系统环境变量文件profile即可。不过此时在 系统环境变量文件中添加的内容尚未生效,还需要分别在Hadoop1、Hadoop2和Hadoop3 执行source/etc/profile命令初始化系统环境变量使配置的Kafka环境变量生效。 5.修改配置文件 为了确保Kafka集群能够正常启动,还需要对Kafka的配置文件进行相关的配置。执 行cd/export/servers/kafka_2.12-3.2.1/config/命令进入Kafka安装目录的config目录,在 该目录执行viserver.properties命令编辑server.properties配置文件,将server.properties 配置文件中对应的参数修改为如下内容。 broker.id=0 log.dirs=/export/data/kafka zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181 上述内容修改完成后,保存并退出server.properties配置文件。针对上述内容中的参 数进行如下讲解。 (1)broker.id:Kafka集群中每个节点的唯一且永久的ID,该值必须大于或等于0。在 本书中,虚拟机Hadoop1、Hadoop2和Hadoop3对应的broker.id分别为0,1,2。 (2)log.dirs:指定Kafka集群运行日志存放的路径。 (3)zookeeper.connect:指定ZooKeeper集群的主机名与端口号。 6.分发Kafka安装目录 执行scp 命令,将虚拟机Hadoop1 的Kafka安装目录分发至虚拟机Hadoop2 和 Hadoop3中存放安装程序的目录,具体命令如下。 # 将Kafka 安装目录分发至虚拟机Hadoop2 中存放安装程序的目录 $ scp -r /export/servers/kafka_2.12-3.2.1/ hadoop2:/export/servers/ Spark大数据分析与应用(Python1 18 版) # 将Kafka 安装目录分发至虚拟机Hadoop3 中存放安装程序的目录 $ scp -r /export/servers/kafka_2.12-3.2.1/ hadoop3:/export/servers/ 将Kafka安装目录分发完成后,分别进入虚拟机Hadoop2和Hadoop3的Kafka安装 目录的config目录,在该目录执行viserver.properties命令编辑server.properties配置文 件,将虚拟机Hadoop2的Kafka的server.properties配置文件中的broker.id修改为1,将 虚拟机Hadoop3的Kafka的server.properties配置文件中的broker.id修改为2。 7.启动ZooKeeper 在启动Kafka之前,需要先启动ZooKeeper服务,分别在虚拟机Hadoop1、Hadoop2和 Hadoop3上执行如下命令启动ZooKeeper服务。 $ zkServer.sh start 8.启动Kafka服务 这里以虚拟机Hadoop1为例,演示如何启动Kafka服务。执行cd/export/servers/kafka_ 2.12-3.2.1/命令进入虚拟机Hadoop1的Kafka安装目录,执行如下命令启动Kafka服务。 $ bin/kafka-server-start.sh config/server.properties 上述命令执行完成后,Kafka服务启动效果如图5-10所示。 图5-10 Kafka服务启动效果 从图5-10可以看出,如果SecureCRT控制台输出的消息中无异常信息,并且光标始终 处于闪烁状态,即表示Kafka启动成功。消息代理默认使用的端口号为9092。 9.查看Kafka启动状态 Kafka启动完成后,可以克隆虚拟机Hadoop1的会话框,执行jps命令查看Kafka是否 正常启动,如图5-11所示。