第 3 节 Flink standalocal集群安装以及集群代码提交执行

上篇:第 2 节 batch批处理(scala实现)

补充一点:

Flink Streaming和Batch的区别:

流处理Streaming

  1. StreamExecutionEnvironment
  2. DataStreaming

批处理Batch

  1. ExecutionEnvironment
  2. DataSet

1、standalone模式集群安装

准备环境jdk

(1)上传发flink安装包到Linux环境中
第 3 节 Flink standalocal集群安装以及集群代码提交执行
(2)解压文件 flink-1.7.0-bin-hadoop27-scala_2.11.tgz

[root@flink102 hadoop]# tar -zxvf flink-1.7.0-bin-hadoop27-scala_2.11.tgz -C module/

(3)修改配置文件

[root@flink102 conf]# vim flink-conf.yaml 

//配置参数
jobmanager.rpc.address: flink102

修改slaves

[root@flink102 conf]# vim slaves 

flink102 

(4)启动

//启动集群
[root@flink102 bin]# ./start-cluster.sh 

//主节点
Starting cluster.  
Starting standalonesession daemon on host hadoop105.
//停止集群
[root@flink102 bin]#./stop-cluster.sh 

(5)访问
http://flink102:8081
第 3 节 Flink standalocal集群安装以及集群代码提交执行


2、程序代码打包

(1) 在pom文件所提供的依赖基础上添加依赖文件:
(打包的依赖)

 <build>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <!-- scala编译插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.1.6</version>
                <configuration>
                    <scalaCompatVersion>2.11</scalaCompatVersion>
                    <scalaVersion>2.11.12</scalaVersion>
                    <encoding>UTF-8</encoding>
                </configuration>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- 可以设置jar包的入口类(可选) -->
                            <mainClass>xuwei.tech.SocketWindowWordCountJava</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

具体pom文件所有的依赖文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example.flink01</groupId>
    <artifactId>flink01</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.6.1</version>
           <!-- //   <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <!-- scala编译插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.1.6</version>
                <configuration>
                    <scalaCompatVersion>2.11</scalaCompatVersion>
                    <scalaVersion>2.11.12</scalaVersion>
                    <encoding>UTF-8</encoding>
                </configuration>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- 可以设置jar包的入口类(可选) -->
                            <mainClass>xuwei.tech.SocketWindowWordCountJava</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

打包命令,自己使用cmd方式打包:
(1)首先找到自己项目位置cmd命令执行方式:

D:\Flink_ShiZhan>mvn clean package -DskipTesta

第 3 节 Flink standalocal集群安装以及集群代码提交执行
第 3 节 Flink standalocal集群安装以及集群代码提交执行
idea的开发工具查看:
第 3 节 Flink standalocal集群安装以及集群代码提交执行


3、集群代码提交执行

(1)把打好的包上传到Linux环境
第 3 节 Flink standalocal集群安装以及集群代码提交执行
(2)查看集群是否启动

[root@flink102 bin]# jps
4037 Jps
3546 TaskManagerRunner
3084 StandaloneSessionClusterEntrypoint

(3)启动执行命令

//先执行这一条命令
[root@flink102 bin]# nc -l 9000

//后执行这一条命令
[root@flink102 flink-1.7.0]# bin/flink run flink01-1.0-SNAPSHOT-jar-with-dependencies.jar 
Starting execution of program
No port set.use default port 9000

Web页面查看执行的任务
第 3 节 Flink standalocal集群安装以及集群代码提交执行


4、local集群Web页面的操作

(1)当我们命令端口数输入数据,web页面也是可以接收数据

[root@flink102 bin]# nc -l 9000
aa
bb
cc

第 3 节 Flink standalocal集群安装以及集群代码提交执行

(2)log日志查看(查看日志输出)

[root@flink102 log]# tail -10 flink-root-taskexecutor-0-flink102.out 
//发现没有数据
[root@flink102 log]# 

(3)我们可以任意指定入口类,其命令是:

//先把nc打开
[root@flink102 ~]# nc -l 9000

//再执行别的入口类(只要打包有这个入口类就可以去指定去执行)
[root@flink102 flink-1.7.0]# bin/flink run -c xuwei.tech.SocketWindowWordCountScala flink01-1.0-SNAPSHOT-jar-with-dependencies.jar 
Starting execution of program
No port set.use default port 9999

(4)停止Web控制台的Job的任务

[root@flink102 flink-1.7.0]# bin/flink cancel 75d076b45fe33cd470f998cd3bc8fa12

Cancelling job 75d076b45fe33cd470f998cd3bc8fa12.
Cancelled job 75d076b45fe33cd470f998cd3bc8fa12.

第 3 节 Flink standalocal集群安装以及集群代码提交执行

上一篇:Spring Boot 学习1-创建Spring Boot应用


下一篇:Python爬虫实战之爬取百度贴吧帖子