Flink basics
Contents
Flink 基础
Flink组件栈
Flink API
1. DataSet API
Transformation | Description |
---|---|
Map | 读入一个元素,返回转换后的一个元素。一个把输入流转换中的数值翻倍的map function: dataStream.map{ x => x*2 } |
FlatMap | 读入一个元素,返回转换后的0个、1个或者多个元素。一个将句子切分成单词的flatmap function: dataStream.flatMap{ str => str.split(" “) } |
Filter | 对读入的每个元素执行boolean函数,并保留返回true的元素。一个过滤掉零值的filter: dataStream.filter{ _!=0 } |
MapPartition | 单个函数调用中转换并行分区。函数将分区作为一个“迭代器”,并可以产生任意数量的结果值。每个分区中的元素数量取决于并行度和以前的操作。 data.mapPartition{ in=>Some(in.size) } |
Reduce | 将当前元素与上一个reduce后的值进行合并,再返回新合并的值。 data.reduce{ _+_ } |
2. DataStream API
Transformation | Description |
---|---|
Map DataStream → DataStream |
读入一个元素,返回转换后的一个元素。一个把输入流转换中的数值翻倍的map function: dataStream.map{ x => x*2 } |
FlatMap DataStream → DataStream |
读入一个元素,返回转换后的0个、1个或者多个元素。一个将句子切分成单词的flatmap function: dataStream.flatMap{ str => str.split(” “) } |
Filter DataStream → DataStream |
对读入的每个元素执行boolean函数,并保留返回true的元素。一个过滤掉零值的filter: dataStream.filter{ _!=0 } |
KeyBy DataStream→KeyedStream |
逻辑上将流分区为不相交的分区,每个分区包含相同key的元素。在内部通过hash分区来实现。关于如何指定分区的keys请参阅keys。该transformation返回一个KeyedDataStream。 dataStream.keyBy(“someKey”) dataStream.keyBy(0) |
Reduce KeyedStream→DataStream |
在一个KeyedStream上不断进行reduce操作。将当前元素与上一个reduce后的值进行合并,再返回新合并的值。—个构造局部求和流的reduce function : keyedStream.reduce { _+_ } |
Aggregations KeyedStream→DataStream |
在一个KeyedStream上不断聚合。min和minBy的区别是min返回最小值,而minBy返回在该字段上值为最小值的所有元素(对于max和maxBy相同)。 keyedStream.sum(0) keyedStream.sum(“key”) keyedStream.min(0) keyedStream.min(“key”) keyedStream.max(0) keyedStream.max(“key”) keyedStream.minBy(0) keyedStream.minBy(“key”) keyedStream.maxBy(0) keyedStream.maxBy(“key”) |
Windows KeyedStream→WindowedStream |
Windows可定义在已分区的KeyedStreams上。Windows会在每个key对应的数据上根据一些特征(例如,在最近5秒内到达的数据)进行分组。 dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) |
3. Source API
- readTextFile(path)
- socketTextStream
- fromCollection
- DataStream Connector
时间
- 事件时间:事件创建的时间。通常由事件中的时间戳描述,例如Kafka消息中生成的时间戳。
- 摄入时间:事件进入Flink数据流运算符的时间。
- 处理时间:每一个执行时间操作的算符的本地时间。
并行度
Flink程序由多个任务组成(source、transformation和sink)。一个任务有多个并行的实例(线程)来执行,一个任务的并行实例(线程)数目就被称为该任务的并行度。
- 算子级别: 设置flink的编程API修改
- 运行环境级别: 设置executionEnvironment的方法修改并行度
- 客户端级别 :
$FLINK_HOME/bin/flink
的-p参数 - 系统修改: 修改
$FLINK_HOME/conf/flink-conf.yaml
文件
注:
-
算子级别>运行环境级别>客户端级别>系统级别
-
并行度不能大于Slot个数
Flink Table & SQL
1. 程序结构
|
|
2. Table和SQL的算子操作
API算子 | Description |
---|---|
Scan DataSet & DataStream |
与SQL查询中的FROM子句类似。执行对注册表的扫描。 Table orders = tableEnv.scan(“Orders”); |
Select DataSet & DataStream |
与SQL SELECT语句类似。执行选择算子操作。 Table orders = tableEnv.scan(“Orders”); Table result = orders.select(“a, c as d”); Table result = orders.select("*"); |
Where / Filter DataSet & DataStream |
与SQL WHERE子句类似。过滤掉未通过过滤谓词的行。 Table orders = tableEnv.scan(“Orders”); Table result = orders.where(“b === ‘red’"); Table result = orders.filter(“a % 2 === 0”); |
GroupBy DataSet & DataStream |
与SQL GROUP BY子句类似。使用以下运行的聚合算子对分组键上的行进行分组,以按组聚合行。 Table orders = tableEnv.scan(“Orders”); Table result = orders.groupBy(“a”).select( “a, b.sum as d”); |
Distinct DataSet & DataStream |
与SQL DISTINCT子句类似。返回具有不同值组合的记录。 Table orders = tableEnv.scan(“Orders”); Table result = orders.distinct(); |
Inner Join DataSet & DataStream |
关联两张表。两个表必须具有不同的字段名称,并且必须通过连接算子或使用where或filter算子定义至少一个相等连接谓词。 Table left = tableEnv.fromDataSet(ds1, “a, b,c”); Table right = tableEnv.fromDataSet(ds2, “d, e, f”); Table result = left.join(right).where(“a = d”).select(“a, b, e”); |
Outer Join DataSet & DataStream |
与SQL LEFT / RIGHT JOIN子句类似。关联两张表。两个表必须具有不同的字段名称,并且必须至少定义一个等于连接谓词。 Table left = tableEnv.fromDataSet(ds1, “a, b,c”); Table right = tableEnv.fromDataSet(ds2, “d, e, f”); Table leftOuterResult = left.leftOuterJoin(right, “a = d”).select(“a, b, e”); Table rightOuterResult = left.rightOuterJoin(right, “a = d”).select(“a, b, e”); |
3. 内置函数
-
比较函数(=/like)
String sql = “select word, sum(frequency) as frequency from WordCount where word = ‘xxx’ GROUP BY word”;
-
逻辑函数
-
算术函数(+/-)
String sql = “select word, sum(frequency) + 1 as frequency from WordCount GROUP BY word”;
-
字符串处理函数(UPPER/LOWER/SUBSTRING)
String sql = “select UPPER(word) as word, sum(frequency) as frequency from WordCount GROUP BY word”;
-
其他(MD5)
String sql = “select MD5(word) as word, sum(frequency) as frequency from WordCount GROUP BY word”;
4. 自定义函数
用户自定义函数UDF/UDAF/UDTF
步骤:
- 继承函数 ScalarFunction
- 覆写方法 eval
- 注册函数
- 应用