The Dormouse's story
5. 6.Once upon a time there were three little sisters; and their names were 7. Elsie, 8. Lacie and 9. Tillie; 10. and they lived at the bottom of a well.
11. 12.…
13. """引入BeautifulSoup,并使用soup的prettify方法查看是否正确载入: 1. from bs4 import BeautifulSoup 2. soup= BeautifulSoup(html_doc) 3. print(soup.prettify()) 一些简单的使用样例: 1. soup.title 2. #The Dormouse's story
15. 16. soup.p\['class'\] 17. # u'title' 18. 19. soup.a 20. # Elsie 21. 22. soup.find_all('a') 23. # \[Elsie, 24. #Lacie, 25. #Tillie\] 26. 27. soup.find(id="link3") 28. # Tillie3) 抓取新浪新闻页面并解析title和提取链接1. import requests 2. from bs4 import BeautifulSoup 3. 4. url= "https://news.sina.com.cn" 5. html= requests.get(url) 6. soup= BeautifulSoup(html.content, 'lxml') 7. print(soup.title) 8. 9. for link in soup.select("div.ct_t_01 h1 a"): 10. print(link.get("href"))3.2Apache Kafka 在第2章的图2.2计算广告系统架构中,可发现有数据管道模块。一般在以往的程序设计中,这个模块很少会被使用: 前端页面交互的数据或者后台程序产生的数据会直接写入文件或者数据库当中,而不是写入一个数据管道。那么为什么需要这个模块呢? 因为在大数据的场景下,数据时常是大规模高并发产生的,可以想象春节前的12306应用或者双十一期间的淘宝网站。一瞬间产生的海量数据是非常难被快速处理并记入数据库中的。如果不想丢失数据,就需要在数据的产生方和使用方之间构建一个缓冲器,让这个缓冲器承受大数据的压力,从而让使用方可以自如地进行数据处理而不是将数据丢掉。这个数据产生方一般称为生产者,使用方一般称为消费者。 同时,一个复杂的系统拥有众多的数据生产者和消费者,需要一个消息的发布和订阅机制代替点对点的消息传输。生产者只关心往队列里写消息,消费者只关心从队列里读消息即可。这种方式在系统架构设计中起到了模块间解耦、流量削峰和异步处理的作用。 可以说,Apache Kafka就是为上述需求设计的,但可以提供的能力远不止消息的发布和订阅。如官网所说,Apache Kafka是一个分布式的流式处理平台,具有高性能、持久化、多副本备份、横向扩展能力。作为一个流式处理平台,Apache Kafka具备以下3个特点。 (1) 发布和订阅消息。 (2) 具备消息的存储和容错能力。 (3) 即时处理消息。 3.2.1系统架构 图3.3Apache Kafka系统架构Apache Kafka通过将生产者(producer)、代理(broker)和消费者(consumer)分布在不同的节点(机器)上,构成分布式系统架构如图3.3所示。主题(topic)是Kafka提供的高层抽象,一个主题就是一个类别或者一个可订阅的数据名称。生产者可以向一个主题推送消息,消费者以组为单位,可以关注并读取自己感兴趣的消息。Kafka通过ZooKeeper实现了对生产者和代理的全局状态信息的管理及其负载均衡。 3.2.2消息、主题和Schema 在Kafka的定义中,消息是一个数据单元,主题是一个数据流的类别或者名称。如果把Kafka看作一个关系数据库,那么消息可以看成是数据库里的一个数据行或一条记录。相应地,一个主题就可以理解为数据库中的一张表,主题的名称就是表名。与大多关系数据库不同的是,在Kafka中消息并没有字段类型,仅被当作字节数组进行处理和保存。除了被表示成字节数组的数据,消息还可以有一个可选的元信息,称为主键。对于Kafka来说,主键也是字节数组,用来控制Kafka数据写入不同分区的过程。最简单的情况就是为主键生成一个连续的散列值,根据散列值对分区总数取模进行分区选择,这样可以保证具有同样主键的消息被写入相同的分区中。 为了提升数据写入的效率,Kafka采用批量写入方式。一个批次就是一组消息集合,这些消息会被写入一个主题和分区。批次写入的方式减少了大量网络I/O开销,但是需要在时间延迟和吞吐量之间做出权衡。一个批次中数据量越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。一般一个批次中的数据会被压缩,进一步节省了数据传输和存储的成本,但是需要在解压中耗费一定的计算量。 Kafka中的消息格式为没有语义的字节数组,因此建议使用Schema描述消息的内容,让消息更容易理解。根据应用需求,Schema有许多可以选择的方式,例如JSON(JavaScript Object Notation)和XML格式,简单易用,可读性很好。不过JSON和XML缺乏强类型的识别,不同版本之间的兼容性不高。在Kafka社区,Apache Avro是非常受欢迎的一种序列化框架。Apache Avro最初是为Hadoop设计的,提供了一种紧凑的序列化格式,Schema和消息数据是解耦的,当Schema发生变化后,不需要重新格式化数据。同时,Apache Avro还支持强类型,发行版本也进行了兼容。对于Kafka开发,保持Schema一致的重要性不言而喻,因为它保证了在写入数据和读取数据时不会因格式不同而造成冲突。 3.2.3分区 每一个分区(partition)是一个有序列表,写入的数据会按照顺序排列,其中的每一个元素都按照顺序被标记上了id,称为偏移量(offset)。不同于其他消息中间件,Kafka中的消息即使被消费了,消息也不会被立即删除。日志文件将会根据broker中的配置要求,保留一定的时间之后再删除。如log文件保留两天,两天后文件会被清除,无论其中的消息是否被消费。Kafka通过这种简单的手段释放磁盘空间,以及减少消息消费之后对文件内容改动的磁盘开支。 分区的目的有多个,最根本原因是Kafka基于文件存储。通过分区,可以将日志内容分散到多个磁盘上,避免文件尺寸达到单机磁盘的上限,每个分区都会被当前服务器(Kafka实例)保存;可以将一个主题切分成任意多个分区,提高消息保存和消费的效率,如图3.4所示。 图3.4Kafka的主题 3.2.4生产者与消费者 生产者创建消息,并把消息发布到一个或多个特定的主题上。一般情况下,生产者默认把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过生产者也可以把消息直接写到指定的分区。这通常是通过消息键和分区器实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。 消费者读取数据,可以订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移量区分已经读取过的消息。偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时,Kafka会把它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在ZooKeeper或Kafka上,如果消费者关闭或者重启,它的读取状态不会丢失。 图3.5为Kafka生产者和消费者。 图3.5Kafka 生产者和消费者 在计算广告系统中,保存广告的点击信息就是一个典型应用场景。在这个场景里,用户和网站的交互数据会通过生产者实时写入Kafka中。生产者不需要关心Kafka的数据多久之后会被读取出来用于进行数据分析,它只需要保证每次用户的点击行为都被正确记录。图3.6展示了向Kafka发送消息的主要步骤。 图3.6Kafka数据写入流程 首先,创建一个ProducerRecord对象,该对象需要包含目标主题和要发送的内容,键或分区作为可选内容。生产者在发送ProducerRecord对象时,要先把对象序列化成字节数组,这样它们才能够在网络上传输。 接下来,数据被传给分区器。如果之前在ProducerRecord对象里指定了分区,那么分区器就直接返回之前指定的分区;如果之前没有指定分区,那么分区器会根据ProducerRecord对象的键选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的代理上。 服务器在收到这些消息时会进行响应: 如果消息成功写入Kafka中,服务器就返回一个RecordMetadata对象,它包含了主题和分区信息,以及记录在分区里的偏移量;如果写入失败,服务器则会返回一个错误信息。生产者在收到错误信息后会尝试重新发送消息,几次之后如果还是失败(次数为生产者配置中的retries值),就返回错误信息。 多个消费者可以组成一个消费群组,也就是说,会有一个或多个消费者共同读取一个主题。消费群组保证每个分区只能被一个消费者使用。通过这种方式,消费者可以消费包含大量消息的主题。而且,如果一个消费者失效,消费群组里的其他消费者可以接管失效消费者的工作,如图3.7所示。 图3.7Kafka消费者与消费群组 对于消费者而言,它需要保存消费消息的偏移量,该偏移量的保存和使用由消费者完全控制;当消费者正常消费消息时,偏移量将会向前驱动,即消息将按照顺序依次被消费。事实上,消费者可以使用任意顺序消费消息,它只需要将偏移量设置为任意值。 Kafka集群几乎不需要维护任何消费者和生产者的状态信息,这些信息由ZooKeeper保存。因此,生产者和消费者的实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响。 应用程序从Kafka中读取消息的时候需要使用Kafka消费者进行主题订阅。假设有一个应用程序需要从一个Kafka主题读取消息并验证这些消息,然后把它们保存起来。应用程序需要创建一个消费者对象,订阅主题并开始接收消息,然后验证消息并保存结果。过了一段时间,假如生产者往主题写入消息的速度超过了应用程序验证数据的速度,这时候就需要对消费者进行横向扩展。就像多个生产者可以向相同的主题写入消息一样,也可以使用多个消费者从同一个主题读取消息,对消息进行分流。 Kafka消费者从属于消费群组。一个消费群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。假设主题T1有4个分区,创建了消费者C1,它是消费群组G1里唯一的消费者,用它订阅主题T1。消费者C1将收到主题T1全部4个分区的消息,如图3.8所示。 图3.8一个消费者 如果在消费群组G1里新增一个消费者C2,那么每个消费者将分别从两个分区接收消息。假设消费者C1接收分区0(partition 0)和分区2(partition 2)的消息,消费者C2接收分区1(partition 1)和分区3(partition 3)的消息,如图3.9所示。 图3.9两个消费者 如果消费群组G1有4个消费者,那么每个消费者可以分配到一个分区,如图3.10所示。 图3.104个消费者如果往消费群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息,如图3.11所示。 图3.11更多的消费者 在消费群组里增加消费者是横向扩展消费能力的主要方式。Kafka的消费者经常会做一些高延迟的操作,如把数据写到数据库或HDFS上,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向扩展的主要手段。为主题创建大量的分区非常必要,在负载增长时可以加入更多的消费者。不过要注意的是,不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。 除了通过增加消费者来横向伸缩单个应用程序外,还经常出现多个应用程序从同一个主题读取数据的情况。实际上,Kafka设计的主要目标之一就是要让Kafka主题里的数据能够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息,而不只是其中的一部分。只要保证每个应用程序有自己的消费群组,就可以让它们获取到主题所有的消息。不同于传统的消息系统,横向扩展Kafka消费者和消费群组并不会对性能造成负面影响。 在上面几个例子里,如果新增一个只包含一个消费者的消费群组G2,那么这个消费者将从主题T1上接收所有的消息,与群组G1之间互不影响。消费群组G2可以增加更多的消费者,每个消费者可以消费若干个分区,就像消费群组G1那样。总的来说,消费群组G2还是会接收到所有消息,不管有没有其他群组存在。 简而言之,为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费群组,然后往群组里添加消费者扩展读取能力和处理能力,消费群组里的每个消费者只处理一部分消息。 3.2.5代理 一个Kafka服务称为一个代理(broker),它同时为生产者和消费者提供服务。 broker接收生产者的写入数据请求,接收消息并为消息设置偏移量后把消息保存到磁盘中。 broker接收消费者读取分区的请求,返回已经存储的消息给消费者。 Kafka在设计之初就考虑将多个broker组合起来作为一个集群使用。在每个集群中,都有一个broker充当集群控制器的角色,这个broker是自动从集群中的活跃成员中选举出来的。作为控制器的broker负责集群的管理工作,包括将分区分配给其他broker以及监控其他broker的工作状态。 在集群中,一个分区从属于一个broker,这个broker称为该分区的首领(leader)。但分区可以分配给多个broker,这样就完成了分区数据的复制。这种复制机制为分区提供了数据冗余,如果有一个broker失效,其他broker可以接管失效broker下的分区。不过,相关的生产者和消费者都要重新连接到新的首领。 相比其他消息中间件,Apache Kafka的一个重要特性是可以持久化一段时间的信息。Kafka broker可以为主题配置一个默认的保留时间(例如,7天)或者一个默认的存储空间(例如,1GB空间)。当消息保留时间超过7天或者消息存储的消息超过1GB空间时,旧消息就会过期并被删除,所以在任意时刻,可用消息的总量都不会超过配置参数所指定的大小。每一个主题都可以设置自己的保留策略,如跟踪用户活动的数据可能需要保留几天,而程序指标只需要保留几小时。一些特殊场景,主题可以设定为根据key只保留最后一个消息。 3.2.6Kafka关键特性〖*2〗1. 持久化Kafka根据设置的保留规则进行数据保存,同时每个主题可以单独设置保留规则,因此Kafka可以满足不同消费者的使用需求。消费者不需要担心因为处理速度过慢或遇到流量高峰而导致无法及时读取消息。即使消费者关闭链接,消息仍然会继续保留在Kafka中,消费者可以从上次中断的地方继续处理消息。 2. 扩展性 为了能够轻松地处理大数据,Kafka从一开始就被设计成一个具有灵活伸缩性的系统。用户在开发阶段可以先使用单个broker,再扩展到包含3个broker的小型开发集群,然后随着数据量的不断增长,部署到生产环境的集群可能包含上百个broker。对在线集群进行扩展丝毫不影响整体系统的可用性。也就是说,一个包含多个broker的集群,即使个别broker失效,仍然可以持续地为客户提供服务。要提高集群的容错能力,需要配置较高的复制系数。 3. 高性能 通过横向扩展生产者、消费者和代理,Kafka可以轻松地处理巨大的信息流。在处理大量数据的同时,它还能保证亚秒级的消息延迟。 4. 分区再均衡 分区的所有权从一个消费者转移到另一个消费者,这样的行为称为再均衡。再均衡非常重要,它为消费群组带来了高可用性和扩展性。在再均衡期间,消费者无法读取消息,造成整个消费群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。 消费者通过向被指派为消费群组协调器的broker发送心跳来维持它们和消费群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,消费群组协调器认为它已经死亡,就会触发一次再均衡。 5. 提交和偏移量 每次调用poll方法,它总是返回生产者写入Kafka,但是还没有被消费者读取过的记录,因此可以追踪到哪些记录是被消费群组里的哪个消费者读取的。Kafka不会像其他JMS队列那样需要得到消费者的确认,相反,消费者可以使用Kafka追踪消息在分区里的位置,或者说是偏移量。 消费者往一个叫作_consumer_offset的特殊主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或者有新的消费者加入消费群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个分区。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。 如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会被重复处理;如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。 6. 复制 复制功能是Kafka架构的核心。在Kafka的文档里,Kafka把自己描述成“一个分布式的、可分区的、可复制的提交日志服务”。复制的关键之处在于如果个别节点失效仍能保证Kafka的可用性和持久性。 Kafka使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。这些副本被保存在broker上,每个broker可以保存成百上千个属于不同主题和分区的副本。 副本有以下两种类型。 (1) leader副本: 每个分区都有一个leader副本。为了保证一致性,所有生产者请求和消费者请求多会经过这个副本。 (2) follower副本: leader以外的副本都是follower副本。follower副本不处理来自客户端的请求,它们唯一的任务就是从leader那里复制消息,保持与leader一致的状态。如果leader发生崩溃,其中一个follower会被提升为新leader。 leader的另一个任务是搞清楚哪个follower的状态与自己是一致的。follower为了保持与leader的状态一致,在有新消息到达时尝试从leader那里复制消息,不过有各种原因会导致同步失败。例如,网络拥塞导致复制变慢,broker发生崩溃导致复制滞后,直到重启broker后复制才会继续。 为了与leader保持同步,follower向leader发送获取数据请求,这种请求和消费者为了读取消息而发送的请求是一样的。leader将响应消息发送给follower。请求消息里包含了follower想要获取消息的偏移量,而且这些偏移量总是有序的。 7. 可靠性 ACID是关系数据库普遍支持的可靠性标准,其中ACID表示原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。关系数据库只有满足ACID标准,才能确保应用程序安全。我们知道数据库系统承诺可以做到什么,也知道在不同条件下它们会发生怎样的行为。 Kafka可以保证分区消息的顺序。如果使用同一个生产者往同一个分区写入消息,而且消息B在消息A之后写入,那么Kafka可以保证消息B的偏移量比消息A的偏移量大,而且消费者会先读取消息A再读取消息B。 只有当消息被写入分区的所有同步副本时,它才被认为是已提交的。生产者可以选择接收不同类型的确认,如在消息被完全提交时的确认,或者在消息被写入leader副本时的确认,或者在消息被发送到网络时的确认。只要还有一个副本是活跃的,那么已经提交的消息就不会丢失。消费者只能读取已经提交的消息。 这些基本的保证机制可以用来构建可靠的系统,但仅仅依赖它们是无法保证系统完全可靠的。构建一个可靠的系统需要做出一些权衡,Kafka管理员和开发者可以在配置参数上做出权衡,从而得到他们想要达到的可靠性。这种权衡一般是指消息存储的可靠性和一致性的重要程度与可用性、高吞吐量、低延迟和硬件成本的重要程度之间的权衡。 3.2.7项目实践4: 通过Kafka进行数据处理 广告系统产生大量线上展示数据,如果数据直接写入HDFS,Hadoop Session无法承受,而且会严重影响Hadoop性能。因此系统会通过消息中间件作为缓存。本项目通过Python实现KafkaProducer和KafkaConsumer。 本实践中,模拟一个简单的广告业务数据的收集与处理系统。 1. Kafka管理 1) 启动服务 Kafka使用ZooKeeper进行配置管理,因此启动Kafka Server之前需要先启动ZooKeeper Server。命令如下: > bin/zookeeper-server-start.sh config/zookeeper.properties \[2019-03-12 15:11:30,836\] INFO Reading configuration from: config/zookeeper .properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)如果启动后出现java.net.BindException: Address already in use这样的错误,说明ZooKeeper要使用的2181端口已经被占用。这时可以通过如下命令查看2181端口被哪个进程占用: > lsof -i:2181 COMMANDPIDUSERFDTYPEDEVICE SIZE/OFF NODE NAME java17177 zookeeper41uIPv4435517580t0TCP :eforward (LISTEN)从上面的信息可以看到已经有一个ZooKeeper在运行了,这是因为启动的Hadoop也使用了ZooKeeper,这样就不需要再次启动ZooKeeper了。但是如果该端口号不是被ZooKeeper占用,就需要考虑是kill占用的进程释放端口号,还是通过修改配置让ZooKeeper服务使用其他端口号。ZooKeeper的端口号配置在config/zookeeper.properties中,设置方法在行“clientPort=2181”。 确认了ZooKeeper正常启动后,开始启动Kafka Server:> bin/kafka-server-start.sh config/server.properties \[2019-03-12 15:13:19,621\] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)注意: 如果在启动ZooKeeper时修改了端口号,那么也需要在Kafka Server的配置文件config/server.properties中进行对应修改,设置方法在行“zookeeper.connect=localhost: 2181”。 2) 创建topic 使用Kafka的第一件事情就是创建一个topic,使用如下命令创建一个名字为test的topic:> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test".之后可以通过下面的命令查看topic是否已经成功创建。> bin/kafka-topics.sh --list --zookeeper localhost:2181 test3) 发送数据和消费数据 有了topic后,就可以使用一个简单的脚本向Kafka的topic中写入数据,下面的命令启动了一个生产者,等待用户输入。> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test输入数据: This is a message This is another message写入数据后,可以启动一个消费者从topic中从头读取数据,命令如下: