Contents

Flink+Maven+IDEA environment construction and code testing

maven环境搭建

maven下载https://maven.apache.org/download.cgi

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/1

Maven 3.3+ require JDK 1.7 or above to execute

解压缩到相应文件夹,

1
tar zvxf apache-maven-3.6.3-bin.tar.gz

修改maven镜像,进入maven的conf文件夹(在bin同级目录下),使用 vim settings.xml 编辑文本,输入反斜杠‘/’,搜索mirrors,按enter定位。设置如下代码:

1
2
3
4
5
6
<mirror>
      <id>alimaven</id>
      <name>aliyun maven</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
      <mirrorOf>central</mirrorOf>
</mirror>

用的是阿里的镜像

配置环境变量

在家目录下.bashrc文件中添加环境变量(其他添加环境变量方法也可)

1
nano ~/.bashrc
1
2
export MAVEN_HOME=/opt/apache-maven-3.6.3-bin/apache-maven-3.6.3
export PATH=$MAVEN_HOME/bin:$PATH

刷新配置 source ~/.bashrc

mvn -v 验证maven安装是否成功:

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/2

安装成功,返回了maven的版本以及Java的版本

IDEA安装配置

下载地址:https://www.jetbrains.com/idea/download

flink开发环境搭建

flink下载https://flink.apache.org/downloads.html#apache-flink-1120

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/3

解压到相应文件夹,可以将解压后到bin文件目录添加到环境变量,方便终端命令行操作

运行集群 ./bin/start-cluster.sh

浏览器访问127.0.0.1:8081,出现如下即启动成功:

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/4

使用maven创建flink项目

命令行创建Flink初始模板项目

1
2
3
4
mvn archetype:generate                               \    //在指定相应目录下
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.10.0

1
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.12.0

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/5

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/6

创建成功后,项目目录结构大致如下:

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/7

IDEA能导入已经创建的环境:打开IDEA之后open对应的项目

如果不想通过命令行的方式生成 maven 工程,可以通过如下设置在 IDEA 中创建 Flink 应用的模板工程,以 Java 为例

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/8

点击Add Acrhetypr…,在弹出的对话框填写如下内容

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/9

选择我们添加的 archetype 便可继续创建 maven 工程。

代码测试

项目结构如下:

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/10

flink-quickstart-java快速地构建了一个基本的Flink项目框架,并生成创建了两个模板程序文件:用于流处理的StreamingJob和用于批处理的BatchJob

新建Java类:SocketTextStreamWordCount

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package org.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 套接字流计数ls
 */

public class SocketTextStreamWordCount {
    public static void main(String[] args) throws Exception{
        //参数检查
        if (args.length!=2){
            System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }

        String hostname=args[0];
        Integer port=Integer.parseInt(args[1]);

        //流执行环境
        final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();

        //套接字获取数据
        DataStreamSource<String> stream=env.socketTextStream(hostname,port);

        //计数
        SingleOutputStreamOperator<Tuple2<String,Integer>> sum=stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);
        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");

    }

    public static final class LineSplitter implements FlatMapFunction<String,Tuple2<String,Integer>>{
        @Override
        public void flatMap(String s,Collector<Tuple2<String,Integer>> collector){
            String[] tokens=s.toLowerCase().split("\\W+");
            for (String token:tokens){
                if(token.length()>0){
                    collector.collect(new Tuple2<String, Integer>(token,1));
                }
            }
        }
    }
}

工程打包

进入pom.xml同级目录,输入:mvn clean package -Dmaven.test.skip=true

上述表示:不但跳过单元测试的运行,也跳过测试代码的编译。

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/11

在项目下会生成一个target目录,打好的JAR包就在这里,如下图所示:

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/12

flink-test-1.0-SNAPSHOT.jar为打好的jar包

启动flink集群:

1
./bin/start-cluster.sh

另开终端, 开启9000端口

1
nc -l 9000

运行程序

1
2
3
4
5
6
#进入flink安装目录,执行命令
./bin/flink run -c org.example.SocketTextStreamWordCount /home/leslie/IdeaProjects/flink-test/target/flink-test-1.0-SNAPSHOT.jar 127.0.0.1 9000
 
#org.example.SocketTextStreamWordCount为jar中执行的方法
#/home/leslie/IdeaProjects/flink-test/target/flink-test-1.0-SNAPSHOT.jar 指明jar程序
#127.0.0.1 9000 为传入的参数,指明套接字的主机地址和端口

在9000终端输入数据

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/13

在Web上能够看到job正在跑

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/14

在Task Manager页面stdout输出能看到运行的结果

/images/2022-08-12-Flink+Maven+IDEA环境搭建及代码测试/15

代码测试成功