学习目标

● 熟悉DataStream 程序开发流程,能够说出开发DataStream 程序的5个步骤。
● 了解DataStream 的数据类型,能够说出DataStream 程序常用的数据类型。
● 熟悉执行环境,能够在DataStream 程序中创建和配置执行环境。
● 掌握数据输入,能够灵活运用DataStreamAPI 提供的数据源算子读取数据。
● 掌握数据转换,能够灵活运用DataStreamAPI 提供的转换算子对DataStream 对象进
行转换。
● 掌握数据输出,能够灵活运用DataStreamAPI 提供的接收器算子输出DataStream 对
象的数据。
● 掌握应用案例———词频统计,能够独立实现词频统计的DataStream 程序。
Flink作为一种实时计算的流处理框架,它提供了DataStreamAPI 支持原生流处理,通过
DataStreamAPI 可以让开发者灵活且高效地编写Flink程序。不过DataStreamAPI 在设计
之初,仅用于实现处理无界流的Flink程序,即流处理程序。随着Flink逐渐向批流一体靠拢, 
DataStreamAPI 同时兼顾了实现处理有界流的Flink程序,即批处理程序。本章针对
DataStreamAPI 实现Flink程序的相关知识进行讲解。

3.aatem 
程序的开发流程
1 
DtSra

DataStreamAPI 允许在Flink中实现流处理程序,这些程序称为DataStream 程序。
DataStream 程序遵循Flink程序结构,包括Source、Transformation和Sink三部分,这三部分
共同构成了程序的执行逻辑。然而,仅凭执行逻辑还不足以让DataStream 程序正常运行。需
要在程序中创建执行环境(ExecutionEnvironment)并添加执行器(Execute)来触发程序
执行。

DataStream 程序开发流程的详细介绍如下。

1. 
创建执行环境
实现DataStream 程序的第一步是创建执行环境。执行环境类似于程序的说明书,它告知
Flink如何解析和执行DataStream 程序。

2. 
读取数据源
在创建执行环境之后,需要通过读取数据源获取数据。数据输入来源通常称为数据源。

3. 
定义数据转换
在从数据源获取数据之后,根据实际业务逻辑对读取的数据定义各种转换操作。数据转


Flink基64 础入门
换是流处理过程中的重要环节,通过对读取的数据进行转换操作,可以对数据进行清洗、加工、
重组等处理,以适应实际的业务需求。
4.输出计算结果
数据经过转换后的最终结果将被写入外部存储,以便为外部应用提供支持。
5.添加执行器
DataStream 程序开发的最后一步是添加执行器,执行器负责实际触发读取数据源、数据
转换和输出计算结果的操作。默认情况下,这些操作仅在作业中定义并添加到数据流图中,并
不会实际执行。
在DataStream 程序中添加执行器的过程相对简单,下面通过示例进行简要说明。在
DataStream 程序中,执行器的添加通过调用执行环境的execute()方法来实现,同时可以向这
个方法传递一个参数以指定作业的名称,其示例代码如下。 
executionEnvironment.execute("itcast"); 
上述示例代码中,executionEnvironment为DataStream 程序的执行环境,itcast为指定的
作业名称。需要注意的是,尽管在大多数情况下,DataStream 程序按照上述5个步骤进行开
发,但在某些特殊场景中,可以跳过定义数据转换这一步。例如,在调试DataStream 程序中读
取数据源的操作时,可以不定义数据转换,而是直接输出读取到的数据。
3.2 DataStream 的数据类型
DataStream 是Flink中用于表示数据集合的类,它是一种抽象的数据结构,实现的
DataStream 程序其实就是基于这个类的处理,通过在类中使用泛型来描述数据集合中每个元
素的数据类型。
DataStream 支持多种数据类型,可以方便地处理不同结构的数据,其中常用的数据类型
包括基本数据类型、元组(Tuple)类型和POJO 类型,具体介绍如下。
1.基本数据类型
DataStream 支持Java和Scala的基本数据类型,如整数、浮点数、字符串等。例如,定义
DataStream 的数据类型为字符串的示例代码如下。 
DataStream<String> stringStream = env.fromElements("A", "B", "C", "D"); 
上述示例代码中,env 为DataStream 程序的执行环境,fromElements()方法为
DataStream API提供的预定义数据源算子。
2.元组类型
Flink中的元组(Tuple)是一种特殊的数据类型,用于封装不同类型的元素。Flink支持
的元组类型有Tuple1 到Tuple25,分别表示包含1~25 个元素的元组。例如,定义
DataStream 的数据类型为元组的示例代码如下。 
DataStream<Tuple2<String, Integer>> tupleStream = 
env.fromElements(Tuple2.of("A", 1), Tuple2.of("B", 2)); 
上述示例代码中,元组的类型为Tuple2,表示元组具有两个元素,其中第一个元素的数据
类型为String(字符串),第二个元素的数据类型为Integer(整数)。

第3章 DataStream API 65 
3.POJO 类型
POJO(PlainOldJavaObject)是一个符合特定条件的Java类,它用于表示具有多个属性
的数据结构。在定义数据类型为POJO 的DataStream 时,需要注意以下几点。
(1)POJO 必须是公共的。
(2)POJO 具有公共的无参构造方法。
(3)POJO 的属性可以是私有的,也可以是公共的。如果属性是私有的,那么必须具有
getter()和setter()方法。
例如,定义DataStream 的数据类型为POJO 的示例代码如下。 
1 public class Person { 
2 private String name; 
3 private int age; 
4 public Person() { 
5 } 
6 public Person(String name,int age) { 
7 this.name = name; 
8 this.age = age; 
9 } 
10 public int getAge() { 
11 return age; 
12 } 
13 public void setAge(int age) { 
14 this.age = age; 
15 } 
16 public String getName() { 
17 return name; 
18 } 
19 public void setName(String name) { 
20 this.name = name; 
21 } 
22 } 
23 DataStream<Person> personStream = 
24 env.fromElements(new Person("Alice", 30), new Person("Bob", 25)); 
上述示例代码中,Person为定义的POJO,它具有两个属性name和age,其中属性name 
的数据类型为String,属性age的数据类型为int。
3.3 执行环境
执行环境在DataStream 程序中扮演着至关重要的角色,它负责任务调度、资源分配以及
程序的执行。因此,在实现DataStream 程序时,首先需要创建一个合适的执行环境,并且基于
执行环境配置DataStream 程序。同样,在工作和学习中,我们都应当考虑到个体的实际情况, 
寻找最适合个体的方法和策略,而非盲目跟随或者一刀切。
DataStream API提供了一个StreamExecutionEnvironment类用于创建和配置执行环
境,具体内容如下。
1.创建执行环境
StreamExecutionEnvironment类提供了3 种方法用于创建执行环境,它们分别是
createLocalEnvironment()、createRemoteEnvironment()和getExecutionEnvironment(),具

Flink基66 础入门
体介绍如下。
1)createLocalEnvironment() 
该方法创建的执行环境为本地执行环境,它会在本地计算机创建一个本地环境来执行
DataStream 程序,通常用于在IDE(集成开发环境)内部执行DataStream 程序。
使用createLocalEnvironment()方法创建执行环境的示例代码具体如下。 
StreamExecutionEnvironment localEnvironment = 
StreamExecutionEnvironment.createLocalEnvironment(); 
2)createRemoteEnvironment() 
该方法创建的执行环境为远程执行环境,它会将DataStream 程序提交到指定的Flink执
行,使用该方法时需要依次传入参数host和port,它们分别用于指定Flink WebUI的IP和
端口号。
使用createRemoteEnvironment()方法创建执行环境的示例代码具体如下。 
StreamExecutionEnvironment remoteEnvironment = 
StreamExecutionEnvironment.createRemoteEnvironment( 
"192.168.121.144", 
8081 
); 
上述代码中,192.168.121.144和8081分别表示FlinkWebUI的IP和端口号。
3)getExecutionEnvironment() 
该方法创建的执行环境基于运行DataStream 程序的环境,如果DataStream 程序在IDE 
运行,那么创建的执行环境为本地执行环境。如果DataStream 程序被提交到Flink运行,那
么创建的执行环境为远程执行环境。
使用getExecutionEnvironment()方法创建执行环境的示例代码具体如下。 
StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment(); 
需要说明的是,使用getExecutionEnvironment()方法创建执行环境的方式较为常用,因
为它能够根据DataStream 程序的运行环境自动创建本地执行环境或远程执行环境,使用起来
较为便捷,后续实现的DataStream 程序主要使用getExecutionEnvironment()方法创建执行
环境。
2.配置执行环境
StreamExecutionEnvironment类提供了多个方法用于配置DataStream 程序,这里介绍
基础且常用的两个方法setRuntimeMode()和setParallelism(),具体内容如下。
1)setRuntimeMode() 
该方法用于配置DataStream 程序的执行模式,DataStream 程序支持两种执行模式,分别
是流处理(STREAMING)和批处理(BATCH),其中流处理是DataStream 程序默认使用的执
行模式,用于实时处理无界流;批处理是专门用于处理有界流的执行模式。除此之外, 
DataStream 程序还支持通过自行判断选择使用的执行模式,称为自动模式,自动模式根据读
取数据源的数据是否有界,自行选择DataStream 程序使用的执行模式为流处理还是批处理。
由于DataStream 程序默认使用的执行模式为流处理,所以这里重点介绍指定批处理或自

第3章 DataStream API 67 
动模式的执行模式,在DataStream 程序中,可以调用执行环境的setRuntimeMode()方法显式
指定执行模式为批处理或自动模式,具体示例代码如下。 
//指定执行模式为批处理
executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH); 
//指定执行模式为自动模式
executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); 
2)setParallelism() 
该方法用于配置DataStream 程序的并行度,如指定DataStream 程序的并行度为3的示
例代码如下。 
executionEnvironment.setParallelism(3); 
需要说明的是,基于执行环境配置的并行度,其优先级要高于配置文件flink-conf.yaml和
flink命令指定的并行度。除此之外,setParallelism()方法还可以为DataStream 程序中的不
同算子单独配置并行度,其优先级要高于执行环境配置的并行度。
综上所述,不同方式配置并行度的优先级从高到低依次为算子配置的并行度、执行环境配
置的并行度、flink命令配置的并行度,配置文件配置的并行度。
【注意】 算子配置的并行度会受到自身具体实现的约束,如socketTextStream 是一个非
并行的数据源算子,此时配置的并行度对该算子是无效的。
3.4 数据输入
要处理数据,就必须先获取数据。因此,在创建执行环境后,首要任务就是将数据从数据
源读取到DataStream 程序中。DataStream 程序可以从各种数据源中读取数据,并通过
DataStream API提供的算子将其转换为DataStream 对象,这些算子称为数据源算子(Source 
Operator),可以看作StreamExecutionEnvironment类提供的一类方法。
如果想要从文件、Socket或集合读取数据,那么可以直接使用DataStream API提供的预
定义数据源算子即可。如果想要从外部系统读取数据,如Kafka、RabbitMQ 等,或者读取自
定义Source的数据,那么可以使用DataStream API提供的数据源算子addSource来实现。
本节针对DataStream 程序的数据输入进行详细讲解。
3.4.1 从集合读取数据
DataStream API提供了预定义数据源算子fromCollection和fromElements,用于从集合
读取数据,具体介绍如下。
1.fromCollection 
使用预定义数据源算子fromCollection从集合读取数据时,需要传递一个集合类型的参
数,其语法格式如下。 
fromCollection(collection) 
上述语法格式中,collection用于指定集合。
2.fromElements 
使用预定义数据源算子fromElements从集合读取数据时,可以传递任意数量的Java对

Flink基68 础入门
象作为参数,Java对象可以是集合类型、数组类型、字符串类型等,其语法格式如下。 
fromElements(T,T,T,...) 
上述语法格式中,T,T,T,...表示给定的对象序列,对象序列中每个T 表示一个Java对
象。需要注意的是,对象序列中所有Java对象的类型必须相同。
接下来通过一个案例演示如何从集合读取数据。在实现本案例之前先说明相关的环境要
求,本书使用IntelliJIDEA 作为集成开发环境,并且使用JDK8构建Java运行环境,因此希
望读者在实现DataStream 程序之前,确保计算机中安装了IntelliJIDEA 和JDK8。为了后续
知识讲解便利,这里将分步骤讲解本案例的实现过程,具体步骤如下。
1)创建Java项目
在IntelliJIDEA中基于Maven创建Java项目,指定项目使用的JDK为本地安装的JDK8, 
以及指定项目名称为Flink_Chapter03。Flink_Chapter03项目创建完成后的效果如图3-1 
所示。
图3-1 Flink_Chapter03项目创建完成后的效果
2)构建项目目录结构
在Java项目的java目录中创建包cn.datastream.demo用于存放实现DataStream 程序
的类。
3)添加依赖
在Java项目的pom.xml文件中添加依赖,依赖添加完成的效果如文件3-1所示。
文件3-1 pom.xml 
1 <?xml version = "1.0" encoding = "UTF-8"?> 
2 <project xmlns = "http://maven.apache.org/POM/4.0.0" 
3 xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" 
4 xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 
5 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
6 <modelVersion>4.0.0</modelVersion> 
7 <groupId>cn.itcast</groupId> 
8 <artifactId>Flink_Chapter03</artifactId> 
9 <version>1.0-SNAPSHOT</version> 
10 <properties> 
11 <maven.compiler.source>8</maven.compiler.source> 
12 <maven.compiler.target>8</maven.compiler.target> 
13 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
14 </properties>

第3章 DataStream API 69 
15 <dependencies> 
16 <dependency> 
17 <groupId>org.apache.flink</groupId> 
18 <artifactId>flink-streaming-java</artifactId> 
19 <version>1.16.0</version> 
20 </dependency> 
21 <dependency> 
22 <groupId>org.apache.flink</groupId> 
23 <artifactId>flink-clients</artifactId> 
24 <version>1.16.0</version> 
25 </dependency> 
26 </dependencies> 
27 </project> 
在文件3-1中,第15~26行代码为添加的内容,其中第16~20行代码表示DataStream 
API的核心依赖;第21~25行代码表示Flink客户端依赖。
依赖添加完成后,确认添加的依赖是否存在于Java项目中,在IntelliJIDEA 主界面的右
侧单击Maven选项卡展开Maven窗口,在Maven窗口单击Dependencies折叠框,如图3-2 
所示。
图3-2 查看添加的依赖
从图3-2可以看出,依赖已经成功添加到Java项目中,如果这里未显示添加的依赖,则可
以单击按钮重新加载pom.xml文件。
4)实现DataStream 程序
创建一个名为ReadCollectionDemo的DataStream 程序,该程序能够从集合中读取数据
并将其输出到控制台,具体代码如文件3-2所示。
文件3-2 ReadCollectionDemo.java 
1 public class ReadCollectionDemo { 
2 public static void main(String[]args) throws Exception { 
3 //创建执行环境
4 StreamExecutionEnvironment executionEnvironment = 
5 StreamExecutionEnvironment.getExecutionEnvironment(); 
6 //指定执行模式为自动模式
7 executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); 
8 List<Tuple2<String,Integer>> list = new ArrayList(); 
9 list.add(new Tuple2<String, Integer>("user01", 23)); 
10 list.add(new Tuple2<String, Integer>("user02", 25)); 
11 list.add(new Tuple2<String, Integer>("user03", 26));

Flink基70 础入门 
12 DataStream<Tuple2<String,Integer>> fromCollectionDataStream = 
13 executionEnvironment.fromCollection(list); 
14 DataStream<Tuple2<String, Integer>> fromElementsDataStream = 
15 executionEnvironment.fromElements( 
16 new Tuple2<String, Integer>("user04", 22), 
17 new Tuple2<String, Integer>("user05", 23), 
18 new Tuple2<String, Integer>("user06", 27)); 
19 fromCollectionDataStream.print("fromCollectionDataStream 的数据"); 
20 fromElementsDataStream.print("fromElementsDataStream 的数据"); 
21 executionEnvironment.execute(); 
22 } 
23 } 
上述代码中,第8~11行代码创建集合list,并且向集合中插入3个元素。第12、13行代
码使用预定义数据源算子fromCollection从集合list读取数据,并将其转换为DataStream 对
象fromCollectionDataStream,该对象的数据类型与集合list中元素的数据类型一致。
第14~18行代码使用预定义数据源算子fromElements从类型为Tuple2的对象序列读
取数据,并将其转换为DataStream 对象fromElementsDataStream,该对象的数据类型与对象
序列中每个对象的类型一致。
第19、20行代码使用DataStream API提供的预定义输出算子print,分别将DataStream 
对象fromCollectionDataStream 和fromElementsDataStream 的数据输出到控制台。
文件3-2的运行结果如图3-3所示。
图3-3 文件3-2的运行结果
从图3-3可以看出,fromCollectionDataStream的数据与集合list的数据相同,fromElementsDataStream 
的数据与对象序列的数据相同。因此说明,成功使用了预定义数据源算子fromCollection和
fromElements从集合读取数据。
3.4.2 从文件读取数据
DataStream API提供了预定义数据源算子readTextFile,用于从指定文件系统的文件读
取数据,如本地文件系统、HDFS等,其语法格式如下。 
readTextFile(path) 
上述语法中,参数path用于指定文件的目录。
接下来通过一个案例来演示如何从文件读取数据。本案例将分别从本地文件系统的D:\ 
FlinkData\Flink_Chapter03 目录和HDFS 的/FlinkData/Flink_Chapter03 目录读取文件
Person.csv和Person。关于这两个文件的内容如图3-4所示。

第3章 DataStream API 71 
图3-4 文件Person.csv和Person的内容
创建一个名为ReadFileDemo的DataStream 程序,该程序能够从文件Person.csv 和
Person中读取数据并将其输出到控制台,具体代码如文件3-3所示。
文件3-3 ReadFileDemo.java 
1 public class ReadFileDemo { 
2 public static void main(String[]args) throws Exception { 
3 StreamExecutionEnvironment executionEnvironment = 
4 StreamExecutionEnvironment.getExecutionEnvironment(); 
5 executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); 
6 DataStream<String> HDFSFileDataStream = 
7 executionEnvironment. 
8 readTextFile("hdfs://192.168.121.144:9820/" + 
9 "FlinkData/Flink_Chapter03/Person"); 
10 DataStream<String> LocalFileDataStream = 
11 executionEnvironment 
12 .readTextFile("D:\\FlinkData\\Flink_Chapter03\\Person.csv"); 
13 HDFSFileDataStream.print("HDFSFileDataStream 的数据"); 
14 LocalFileDataStream.print("LocalFileDataStream 的数据"); 
15 executionEnvironment.execute(); 
16 } 
17 } 
上述代码中,第6~9行代码使用预定义数据源算子readTextFile从文件Person读取
数据,并将其转换为DataStream 对象HDFSFileDataStream。第10~12行代码使用预定
义数据源算子readTextFile从文件Person.csv读取数据,并将其转换为DataStream 对象
LocalFileDataStream。
由于文件3-3实现的DataStream 程序从HDFS的文件读取数据,所以在运行文件3-3之
前,需要在Java项目的依赖管理文件pom.xml的<dependencies>标签中添加Hadoop客户
端依赖,具体内容如下。 
1 <dependency> 
2 <groupId>org.apache.hadoop</groupId> 
3 <artifactId>hadoop-client</artifactId> 
4 <version>3.2.2</version> 
5 </dependency> 
确保Hadoop处于启动状态,以及上述依赖成功添加到Java项目之后,文件3-3的运行结
果如图3-5所示。

Flink基72 础入门
图3-5 文件3-3的运行结果
从图3-5可以看出,HDFSFileDataStream的数据与文件Person的数据相同,LocalFileDataStream 的
数据与文件Person.csv的数据相同。因此说明,成功使用了预定义数据源算子readTextFile从文
件读取数据。
3.4.3 从Socket读取数据
DataStream API提供了预定义数据源算子socketTextStream,用于从Socket读取数据, 
其语法格式如下。 
socketTextStream(hostname,port) 
上述语法格式中,参数hostname用于指定Socket的主机名或IP地址;port用于指定
Socket的端口号。
接下来通过一个案例来演示如何从Socket读取数据,具体操作步骤如下。
(1)本案例通过网络工具Ncat建立TCP连接,以此来演示如何从Socket获取数据。在
虚拟机Flink01执行如下命令在线安装Ncat。 
$ yum install -y nc 
(2)在虚拟机Flink01中通过Ncat工具建立TCP连接,并指定端口号为9999,具体命令
如下。 
$ nc -lk 9999 
上述命令执行完成后的效果如图3-6所示。
图3-6 建立TCP连接
在图3-6中成功建立了TCP连接,并等待发送数据。
(3)创建一个名为ReadSocketDemo的DataStream 程序,该程序能够读取TCP连接发
送的数据并将其输出到控制台,具体代码如文件3-4所示。

第3章 DataStream API 73 
文件3-4 ReadSocketDemo.java 
1 public class ReadSocketDemo { 
2 public static void main(String[]args) throws Exception { 
3 StreamExecutionEnvironment executionEnvironment = 
4 StreamExecutionEnvironment.getExecutionEnvironment(); 
5 executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); 
6 DataStream<String> socketDataStream = executionEnvironment 
7 .socketTextStream("192.168.121.144", 9999); 
8 socketDataStream.print("socketDataStream 的数据"); 
9 executionEnvironment.execute(); 
10 } 
11 } 
第6、7行代码使用预定义数据源算子socketTextStream 从虚拟机Flink01的TCP连接
读取数据,并将其转换为DataStream 对象socketDataStream。
文件3-4的运行效果(1)如图3-7所示。
图3-7 文件3-4的运行效果(1) 
在图3-7中,当文件3-4运行完成后,会等待虚拟机Flink01的TCP连接发送数据。
在图3-6所示的界面输入“apple,banana,pear”之后按Enter键发送数据,此时查看图3-7 
所示界面,文件3-4的运行效果(2)如图3-8所示。
图3-8 文件3-4的运行效果(2) 
从图3-8可以看出,socketTextStream 的数据与TCP连接发送的数据相同。因此说明, 
成功使用了预定义数据源算子socketTextStream 从Socket读取数据。
3.4.4 从Kafka读取数据
DataStream API提供了数据源算子addSource用于从外部系统读取数据,其语法格式
如下。 
addSource(sourcefunction) 
上述语法格式中,sourcefunction用于指定实现连接外部系统的方式,不同外部系统的实
现方式有所不同。这里以常用的外部系统Kafka为例进行讲解。
Kafka是由Apache软件基金会开发的一个开源流处理平台,它是一种高吞吐量的分布式
发布订阅消息系统。在实时流处理的应用场景中,通常将Kafka和Flink进行结合使用,其中

Flink基74 础入门
Kafka负责数据的收集和传输,Flink负责从Kafka读取数据进行计算。这种结合使用不同技
术的思维方式提醒人们在面对复杂的问题和挑战时,需要将来自不同领域的知识、技能和资源
进行有效整合,通过跨界合作和交叉思维来发掘更多可能的解决方案。这种思维方式有助于
我们打破独立思考的限制,拓宽视野,发现更多创新的思路和方法。
接下来通过一个案例来演示如何从Kafka读取数据,具体操作步骤如下。
1.安装Kafka 
实现本案例的首要任务便是安装Kafka,本书使用Kafka的版本为3.3.0,这里以虚拟机
Flink01为例,演示如何安装Kafka,具体操作步骤如下。
1)上传Kafka安装包
使用rz命令将本地计算机中准备好的Kafka安装包kafka_2.12-3.3.0.tgz上传到虚拟机
的/export/software/目录。
2)安装Kafka 
使用tar命令将Kafka安装到虚拟机的/export/servers/目录,具体命令如下。 
$ tar -zxvf /export/software/kafka_2.12-3.3.0.tgz -C /export/servers/ 
3)启动Kafka内置的ZooKeeper 
Kafka的运行与ZooKeeper有着密不可分的关系,如Kafka通过ZooKeeper来存储元数
据。在实际应用场景中,Kafka通常基于独立部署的ZooKeeper来运行,不过出于便捷性考
虑,本案例使用的Kafka是基于其内置的ZooKeeper来运行的。
为了避免端口号冲突,需要提前关闭第2章在虚拟机Flink01中启动的Zookeeper服务
后,在虚拟机的/export/servers/kafka_2.12-3.3.0 目录执行如下命令启动Kafka 内置
ZooKeeper。 
$ bin/zookeeper-server-start.sh config/zookeeper.properties 
上述命令通过Kafka提供的脚本文件kafka-server-start.sh启动Kafka内置ZooKeeper。
Kafka内置ZooKeeper启动完成的效果如图3-9所示。
图3-9 Kafka内置ZooKeeper启动完成的效果

第3章 DataStream API 75 
在图3-9中,若启动信息中出现started,则证明Kafka内置ZooKeeper启动成功。需要注
意的是,Kafka内置ZooKeeper启动完成后,会占用当前操作窗口,用户无法在当前窗口进行
其他操作。如果关闭该窗口,Kafka内置ZooKeeper会停止运行。
4)启动Kafka 
启动Kafka的本质是在虚拟机中启动一个消息代理服务(BrokerService),消息代理服务
是Kafka运行的依据。在SecureCRT中,为虚拟机Flink01克隆一个新的操作窗口用于启动
Kafka。
在虚拟机的/export/servers/kafka_2.12-3.3.0目录执行如下命令启动Kafka。 
$ bin/kafka-server-start.sh config/server.properties 
上述命令通过Kafka提供的脚本文件kafka-server-start.sh启动Kafka。Kafka启动完成
的效果如图3-10所示。
图3-10 Kafka启动完成的效果
在图3-10中,若启动信息中出现started,则证明Kafka启动成功。需要注意的是,Kafka 
启动完成后,会占用当前操作窗口,用户无法在当前窗口进行其他操作。如果关闭该窗口, 
Kafka会停止运行。
5)测试Kafka 
测试Kafka的主要目的是验证Kafka生产者(producer)向指定主题(topic)发布的消息, 
是否可以被订阅该主题的Kafka消费者(consumer)所接收,具体实现过程如下。
(1)在SecureCRT中,为虚拟机Flink01克隆一个新的操作窗口用于创建主题。在虚拟
机的/export/servers/kafka_2.12-3.3.0目录执行如下命令创建主题。 
$ bin/kafka-topics.sh --create --topic kafka-source-topic \ 
--bootstrap-server 192.168.121.144:9092 
上述命令通过Kafka提供的脚本文件kafka-topics.sh操作主题,其中参数--create用于创
建主题;参数--topic用于指定主题名称,这里指定的主题名称为kafka-source-topic;参数-- 
bootstrap-server用于指定Kafka的IP地址和端口号,Kafka默认的端口号是9092。主题创

Flink基76 础入门
建完成的效果如图3-11所示。
图3-11 主题创建完成的效果
在图3-11中,若主题创建完成后出现“Createdtopickafka-source-topic.”,则证明成功在
Kafka创建名称为kafka-source-topic的主题。
(2)启动Kafka 生产者,该生产者向主题kafka-source-topic 发布消息,在/export/ 
servers/kafka_2.12-3.3.0目录执行如下命令。 
$ bin/kafka-console-producer.sh --topic kafka-source-topic \ 
--bootstrap-server 192.168.121.144:9092 
上述命令通过Kafka提供的脚本文件kafka-console-producer.sh启动Kafka生产者,其
中参数--topic用于指定主题名称;参数--bootstrap-server用于指定Kafka的IP地址和端口
号。Kafka生产者启动完成的效果如图3-12所示。
图3-12 Kafka生产者启动完成的效果
在图3-12中,当Kafka生产者启动完成后,便可以在“>”位置输入要发布到主题kafkasource-
topic的消息。
(3)在SecureCRT中,为虚拟机Flink01克隆一个新的操作窗口来启动Kafka消费者,该
消费者通过订阅主题kafka-source-topic接收Kafka生产者发布的消息。
在虚拟机的/export/servers/kafka_2.12-3.3.0目录执行如下命令启动Kafka消费者。 
$ bin/kafka-console-consumer.sh --topic kafka-source-topic \ 
--from-beginning --bootstrap-server 192.168.121.144:9092 
上述命令通过Kafka提供的脚本文件kafka-console-consumer.sh启动Kafka消费者,其
中参数--topic用于指定主题名称;参数--bootstrap-server用于指定Kafka的IP地址和端口
号。参数--from-beginning表示Kafka消费者从主题的第一条消息开始消费。Kafka消费者
启动完成的效果如图3-13所示。
在图3-13中,当Kafka消费者启动完成后,会等待Kafka生产者向主题kafka-sourcetopic
发布消息。
(4)在Kafka生产者输入“Thisismyfirstevent”之后按Enter键发布消息,此时,查看

第3章 DataStream API 77 
图3-13 Kafka消费者启动完成的效果
Kafka消费者是否可以接收消息,如图3-14所示。
图3-14 发布消息
从图3-14可以看出,Kafka消费者可以成功接收到Kafka生产者向主题kafka-sourcetopic
发布的消息Thisismyfirstevent,因此说明成功安装Kafka。
需要说明的是,若想要关闭Kafka生产者或消费者,则可以通过组合键Ctrl+C实现。
2.添加依赖
Flink提供了与Kafka建立连接的连接器,但是需要通过添加依赖才能使用。在Java项
目的依赖管理文件pom.xml的<dependencies>标签中添加以下内容,即添加与Kafka建立
连接的连接器依赖。 
1 <dependency> 
2 <groupId>org.apache.flink</groupId> 
3 <artifactId>flink-connector-kafka</artifactId> 
4 <version>1.16.0</version> 
5 </dependency> 
3.实现DataStream 程序
创建一个名为ReadKafkaDemo的DataStream 程序,该程序能够从Kafka读取数据并将
其输出到控制台,具体代码如文件3-5所示。
文件3-5 ReadKafkaDemo.java 
1 public class ReadKafkaDemo { 
2 public static void main(String[]args) throws Exception { 
3 StreamExecutionEnvironment executionEnvironment = 
4 StreamExecutionEnvironment.getExecutionEnvironment(); 
5 executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); 
6 Properties properties = new Properties(); 
7 properties.setProperty("bootstrap.servers","192.168.121.144:9092"); 
8 properties.setProperty("group.id","kafkasource"); 
9 DataStream<String> kafkaDataStream = 
10 executionEnvironment.addSource( 
11 new FlinkKafkaConsumer<>("kafka-source-topic",

Flink基78 础入门 
12 new SimpleStringSchema(), 
13 properties)); 
14 kafkaDataStream.print("kafkaDataStream 的数据"); 
15 executionEnvironment.execute(); 
16 } 
17 } 
上述代码中,第6~8行代码用于指定Kafka的配置信息,其中第7行代码用于指定
Kafka的IP地址和端口号;第8行代码用于指定Kafka消费者的GroupId。
第9~13 行代码使用数据源算子addSource 从Kafka 读取数据,并将其转换为
DataStream 对象kafkaDataStream。使用数据源算子addSource从Kafka读取数据时,需要
在数据源算子addSource中实例化FlinkKafkaConsumer类的同时传递三个参数,其中第一个
参数用于指定Kafka主题名称;第二个参数负责将接收的消息反序列化为字符串;第三个参数
用于指定Kafka的配置信息。
4.测试DataStream 程序
为了避免在运行文件3-5时,DataStream 程序将IP地址重定向至主机名,导致无法连接
到Kafka,可以在本地计算机的hosts文件中添加虚拟机Flink01的主机名和IP地址的映射。
在虚拟机Flink01分别启动Kafka内置ZooKeeper、Kafka和Kafka生产者,其中Kafka 
图3-15 此时查看文件3-5的运行结果
生产者指定的主题名称需要与文件3-5中指定的主题
名称一致。
运行文件3-5,在Kafka 生产者发布一条消息
itcast,此时查看文件3-5的运行结果,如图3-15所示。
从图3-15可以看出,kafkaDataStream 的数据与
Kafka生产者发布的消息一致。因此说明,成功使用了数据源算子addSource从Kafka读取
数据。
3.4.5 自定义Source 
如果在实际使用过程中遇到特殊需求,如需要从外部系统读取数据源,但是Flink没有提
供相应的连接器,或者预定义的数据源算子无法满足实际需求,那么可以自定义Source来解
决这个问题。自定义Source允许根据特定需求编写自己的数据源逻辑,以满足定制化的数据
读取需求。
自定义Source 时,可以通过实现SourceFunction 接口自定义生成数据的逻辑。
SourceFunction接口定义了两个方法,分别是run()和cancel(),其中run()方法用于自定义
生成数据的逻辑并发送生成的数据,在该方法中用户可以通过while循环不断地生成数据; 
cancel()方法用于终止生成数据,在该方法中用户可以设置一个特殊的标记来终止数据的生
成。关于自定义Source的程序结构如下。 
public class MySource implements SourceFunction<dataType> { 
@Override 
public void run(SourceContext<dataType> sourceContext) throws Exception{ 
} 
@Override

第3章 DataStream API 79 
public void cancel() { 
} 
}
上述程序结构中,dataType用于指定生成数据的数据类型。
接下来通过一个案例来演示如何自定义Source,具体代码如文件3-6所示。
文件3-6 MySource.java 
1 public class MySource implements 
2 SourceFunction<Tuple2<String,Integer>> { 
3 private Boolean label = true; 
4 @Override 
5 public void run(SourceContext<Tuple2<String,Integer>> sourceContext) 
6 throws Exception { 
7 Random random = new Random(); 
8 int count = 0; 
9 String[]fruits = {"apple","banana","pear","orange","cherry","grape"}; 
10 while (label){ 
11 String fruit = fruits[random.nextInt(6)]; 
12 Integer buyNum = random.nextInt(100); 
13 sourceContext.collect(new Tuple2<>(fruit,buyNum)); 
14 count++; 
15 if(count == 10){ 
16 cancel(); 
17 } 
18 } 
19 } 
20 @Override 
21 public void cancel() { 
22 label = false; 
23 } 
24 } 
上述代码中,第3行代码定义的变量label用于标记自定义数据源是否正在运行。第10~18 
行代码通过while循环来不断生成元组类型的数据,元组的第一个元素随机从数组fruits选
取,而第二个元素为100以内的随机整数,并且通过collect()方法发送生成的每条数据。第
15~17行代码用于判断生成数据的数量是否为10,当生成数据的数量等于10时,调用cancel() 
方法终止生成数据。
在DataStream 程序中,如果需要从自定义Source读取数据,只需要在数据源算子
addSource中实例化自定义Source的类即可。下面通过一个案例来演示如何从自定义Source 
读取数据,创建一个名为ReadMySource的DataStream 程序,该程序能够从自定义Source读
取数据并将其输出到控制台,具体代码如文件3-7所示。
文件3-7 ReadMySource.java 
1 public class ReadMySource { 
2 public static void main(String[]args) throws Exception { 
3 StreamExecutionEnvironment executionEnvironment = 
4 StreamExecutionEnvironment.getExecutionEnvironment();

Flink基80 础入门 
5 executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); 
6 DataStream<Tuple2<String, Integer>> mySourceDataStream = 
7 executionEnvironment.addSource(new MySource()); 
8 mySourceDataStream.print("mySourceDataStream 的数据"); 
9 executionEnvironment.execute(); 
10 } 
11 } 
上述代码中,第6、7行代码使用数据源算子addSource从自定义Source读取数据,并将
其转换为DataStream 对象mySourceDataStream,该对象的数据类型与自定义Source生成数
据的数据类型一致。
文件3-7的运行结果如图3-16所示。
图3-16 文件3-7的运行结果
从图3-16可以看出,mySourceDataStream包含10条数据,并且每条数据与自定义Source中
run()方法指定生成数据的逻辑一致。因此说明,成功使用了数据源算子addSource从自定义
Source读取数据。
3.5 数据转换
从不同的数据源读取数据之后,便可以使用DataStream API提供的算子对DataStream 
对象进行转换,这些算子称为转换算子(TransformationOperator)。转换算子可以将一个或
多个DataStream 对象转换为新的DataStream 对象,这些转换算子可以实现各种数据处理逻
辑,从而让用户可以灵活地操作数据。本节针对常见的转换算子进行介绍。
3.5.1 map 
map用于对DataStream 对象的每条数据进行转换,形成新的DataStream 对象。例如,使
用map对DataStream 对象进行转换,将每条数据乘以2,其转换过程如图3-17所示。
图3-17 map转换过程

第3章 DataStream API 81 
从图3-17可以看出,map每转换一条数据便输出一条数据作为转换结果。有关使用map 
对DataStream 对象进行转换的程序结构如下所示。 
DataStream<OutputDataType> newDataStream = dataStream.map( 
new MapFunction<InputDataType, OutputDataType>() { 
@Override 
public OutputDataType map(InputDataType inputData) throws Exception { 
return resultData; 
} 
}); 
上述程序结构中,OutputDataType用于指定转换结果的数据类型;newDataStream 用于
指定新生成DataStream 对象的名称;dataStream 用于指定转换的DataStream 对象; 
InputDataType用于指定转换的DataStream 对象的数据类型;inputData 表示转换的
DataStream 对象的每条数据;resultData用于指定数据的转换结果。
接下来通过一个案例演示如何使用map对DataStream 对象进行转换。创建一个名为
MapDemo的DataStream 程序,该程序将集合读取的每条数据乘以2,具体代码如文件3-8 
所示。
文件3-8 MapDemo.java 
1 public class MapDemo { 
2 public static void main(String[]args) throws Exception { 
3 StreamExecutionEnvironment executionEnvironment = 
4 StreamExecutionEnvironment.getExecutionEnvironment(); 
5 executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); 
6 DataStream<Integer> inputDataStream = 
7 executionEnvironment.fromElements(1, 2, 3); 
8 DataStream<Integer> mapDataStream = 
9 inputDataStream.map(new MapFunction<Integer, Integer>() { 
10 @Override 
11 public Integer map(Integer inputData) throws Exception { 
12 Integer resultData = inputData *2; 
13 return resultData; 
14 } 
15 }); 
16 mapDataStream.print("mapDataStream 的数据"); 
17 executionEnvironment.execute(); 
18 } 
19 } 
图3-18 文件3-8的运行结果
上述代码中,第6、7行代码使用预定义数据源算子fromElements从类型为Integer的对
象序列读取数据,并将其转换为DataStream 对象inputDataStream,该对象的数据类型与对象
序列中每个对象的类型一致。第8~15行代码使用map对inputDataStream 进行转换,形成
新的DataStream 对象mapDataStream。
文件3-8的运行结果如图3-18所示。
从图3-18 可以看出,mapDataStream 的数据与
inputDataStream 中每条数据乘以2的结果一致。因此说

Flink基82 础入门
明,使用map成功对DataStream 对象进行转换。
3.5.2 flatMap 
flatMap用于对DataStream 对象进行扁平化处理,将DataStream 对象的每条数据按照
特定规则拆分为多条数据,再对拆分后的每条数据进行转换,形成新的DataStream 对象。例
如,使用flatMap对DataStream 对象进行转换,将每条数据通过字符“ ”拆分成多条数据,拆
分后的每条数据乘以2,其转换过程如图3-19所示。
图3-19 flatMap转换过程
从图3-19可以看出,flatMap 将每条数据拆分为两条数据作为转换结果。有关使用
flatMap对DataStream 对象进行转换的程序结构如下所示。 
DataStream<OutputDataType> newDataStream = dataStream.flatMap( 
new FlatMapFunction<InputDataType, OutputDataType>() { 
@Override 
public void flatMap( 
InputDataType inputData, 
Collector<OutputDataType> collector) throws Exception { 
collector.collect(resultData); 
} 
}); 
上述程序结构中,OutputDataType用于指定转换结果的数据类型;newDataStream 用于
指定新生成DataStream 对象的名称;dataStream 用于指定转换的DataStream 对象; 
InputDataType用于指定转换的DataStream 对象的数据类型;inputData 表示转换的
DataStream对象的每条数据;resultData用于指定数据的转换结果;collector用于收集输出的数
据,通过调用collector提供的collect()方法,可以将收集的数据输出到新生成的DataStream 
对象。接
下来通过一个案例演示如何使用flatMap对DataStream 对象进行转换。创建一个名
为FlatMapDemo的DataStream 程序,该程序将集合读取的每条数据通过分隔符“,”进行拆
分,拆分后的每条数据乘以2,具体代码如文件3-9所示。
文件3-9 FlatMapDemo.java 
1 public class FlatMapDemo { 
2 public static void main(String[]args) throws Exception { 
3 StreamExecutionEnvironment executionEnvironment = 
4 StreamExecutionEnvironment.getExecutionEnvironment();