Contents

Flink basics

Flink 基础

Flink组件栈

/images/2022-08-11-Flink基础知识/1

1. DataSet API

Transformation Description
Map 读入一个元素,返回转换后的一个元素。一个把输入流转换中的数值翻倍的map function:
dataStream.map{ x => x*2 }
FlatMap 读入一个元素,返回转换后的0个、1个或者多个元素。一个将句子切分成单词的flatmap function:
dataStream.flatMap{ str => str.split(" “) }
Filter 对读入的每个元素执行boolean函数,并保留返回true的元素。一个过滤掉零值的filter:
dataStream.filter{ _!=0 }
MapPartition 单个函数调用中转换并行分区。函数将分区作为一个“迭代器”,并可以产生任意数量的结果值。每个分区中的元素数量取决于并行度和以前的操作。
data.mapPartition{ in=>Some(in.size) }
Reduce 将当前元素与上一个reduce后的值进行合并,再返回新合并的值。
data.reduce{ _+_ }

2. DataStream API

Transformation Description
Map
DataStream → DataStream
读入一个元素,返回转换后的一个元素。一个把输入流转换中的数值翻倍的map function:
dataStream.map{ x => x*2 }
FlatMap
DataStream → DataStream
读入一个元素,返回转换后的0个、1个或者多个元素。一个将句子切分成单词的flatmap function:
dataStream.flatMap{ str => str.split(” “) }
Filter
DataStream → DataStream
对读入的每个元素执行boolean函数,并保留返回true的元素。一个过滤掉零值的filter:
dataStream.filter{ _!=0 }
KeyBy
DataStream→KeyedStream
逻辑上将流分区为不相交的分区,每个分区包含相同key的元素。在内部通过hash分区来实现。关于如何指定分区的keys请参阅keys。该transformation返回一个KeyedDataStream。
dataStream.keyBy(“someKey”)
dataStream.keyBy(0)
Reduce
KeyedStream→DataStream
在一个KeyedStream上不断进行reduce操作。将当前元素与上一个reduce后的值进行合并,再返回新合并的值。—个构造局部求和流的reduce function :
keyedStream.reduce { _+_ }
Aggregations
KeyedStream→DataStream
在一个KeyedStream上不断聚合。min和minBy的区别是min返回最小值,而minBy返回在该字段上值为最小值的所有元素(对于max和maxBy相同)。
keyedStream.sum(0) keyedStream.sum(“key”) keyedStream.min(0) keyedStream.min(“key”)
keyedStream.max(0) keyedStream.max(“key”) keyedStream.minBy(0) keyedStream.minBy(“key”)
keyedStream.maxBy(0) keyedStream.maxBy(“key”)
Windows
KeyedStream→WindowedStream
Windows可定义在已分区的KeyedStreams上。Windows会在每个key对应的数据上根据一些特征(例如,在最近5秒内到达的数据)进行分组。
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))

3. Source API

  • readTextFile(path)
  • socketTextStream
  • fromCollection
  • DataStream Connector

时间

  1. 事件时间:事件创建的时间。通常由事件中的时间戳描述,例如Kafka消息中生成的时间戳。
  2. 摄入时间:事件进入Flink数据流运算符的时间。
  3. 处理时间:每一个执行时间操作的算符的本地时间。

/images/2022-08-11-Flink基础知识/2

并行度

Flink程序由多个任务组成(source、transformation和sink)。一个任务有多个并行的实例(线程)来执行,一个任务的并行实例(线程)数目就被称为该任务的并行度。

  • 算子级别: 设置flink的编程API修改
  • 运行环境级别: 设置executionEnvironment的方法修改并行度
  • 客户端级别 : $FLINK_HOME/bin/flink 的-p参数
  • 系统修改: 修改$FLINK_HOME/conf/flink-conf.yaml文件

注:

  • 算子级别>运行环境级别>客户端级别>系统级别

  • 并行度不能大于Slot个数

1. 程序结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//create a TableEnvironment
//for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

//register a Table
tableEnv.registerTable("table1",...);		//or
tableEnv.registerTableSource("table2",...);	//or
tableEnv.registerExternalCatalog("extCat",...);

//create a Table from a Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
//create a Table from a SQL query
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...");

//emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...);

//execute
env.execute();

2. Table和SQL的算子操作

API算子 Description
Scan
DataSet & DataStream
与SQL查询中的FROM子句类似。执行对注册表的扫描。
Table orders = tableEnv.scan(“Orders”);
Select
DataSet & DataStream
与SQL SELECT语句类似。执行选择算子操作。
Table orders = tableEnv.scan(“Orders”);
Table result = orders.select(“a, c as d”);
Table result = orders.select("*");
Where / Filter
DataSet & DataStream
与SQL WHERE子句类似。过滤掉未通过过滤谓词的行。
Table orders = tableEnv.scan(“Orders”);
Table result = orders.where(“b === ‘red’");
Table result = orders.filter(“a % 2 === 0”);
GroupBy
DataSet & DataStream
与SQL GROUP BY子句类似。使用以下运行的聚合算子对分组键上的行进行分组,以按组聚合行。
Table orders = tableEnv.scan(“Orders”);
Table result = orders.groupBy(“a”).select( “a, b.sum as d”);
Distinct
DataSet & DataStream
与SQL DISTINCT子句类似。返回具有不同值组合的记录。
Table orders = tableEnv.scan(“Orders”);
Table result = orders.distinct();
Inner Join
DataSet & DataStream
关联两张表。两个表必须具有不同的字段名称,并且必须通过连接算子或使用where或filter算子定义至少一个相等连接谓词。
Table left = tableEnv.fromDataSet(ds1, “a, b,c”);
Table right = tableEnv.fromDataSet(ds2, “d, e, f”);
Table result = left.join(right).where(“a = d”).select(“a, b, e”);
Outer Join
DataSet & DataStream
与SQL LEFT / RIGHT JOIN子句类似。关联两张表。两个表必须具有不同的字段名称,并且必须至少定义一个等于连接谓词。
Table left = tableEnv.fromDataSet(ds1, “a, b,c”);
Table right = tableEnv.fromDataSet(ds2, “d, e, f”);
Table leftOuterResult = left.leftOuterJoin(right, “a = d”).select(“a, b, e”);
Table rightOuterResult = left.rightOuterJoin(right, “a = d”).select(“a, b, e”);

3. 内置函数

  • 比较函数(=/like)

    String sql = “select word, sum(frequency) as frequency from WordCount where word = ‘xxx’ GROUP BY word”;

  • 逻辑函数

  • 算术函数(+/-)

    String sql = “select word, sum(frequency) + 1 as frequency from WordCount GROUP BY word”;

  • 字符串处理函数(UPPER/LOWER/SUBSTRING)

    String sql = “select UPPER(word) as word, sum(frequency) as frequency from WordCount GROUP BY word”;

  • 其他(MD5)

    String sql = “select MD5(word) as word, sum(frequency) as frequency from WordCount GROUP BY word”;

4. 自定义函数

用户自定义函数UDF/UDAF/UDTF

步骤:

  1. 继承函数 ScalarFunction
  2. 覆写方法 eval
  3. 注册函数
  4. 应用

Flink开发环境搭建和应用的配置、部署及运行

Flink开发环境搭建和应用的配置、部署及运行