本文
前往“校招VIP”小程序,访问更方便

【校招VIP】Flink之Java入门

csdn 09月11日

转载声明:文章来源:https://blog.csdn.net/baidu_35760874/article/details/124724594

介绍
Flink是一个处理流数据的组件,在实时计算等场景下可以发挥巨大的作用。
流数据一般分为:

有界数据流(知道数据的起点和终点,例如一个txt文件的数据)
无界数据流(不知道数据的终点,例如kafka消息、socket数据)

java demo
添加依赖

<properties>
<java.version>1.8</java.version>
<flink.version>1.12.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

示例代码

无界数据流demo

配置netcat, 网上下载nc.exe 点我下载
在安装目录打开cmd,输入如下命令,配置端口为9000

nc -L -p 9000 -v

执行成功后如图所示

接下来编写代码,本demo实现了一个将字符串数据先按照逗号分割,然后转为大写的逻辑

package com.greenutility.mask.util;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FlinkTest {

public static void main(String[] args) throws Exception {
// 1.准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置运行模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2.加载数据源
DataStreamSource<String> elementsSource = env.socketTextStream("127.0.0.1", 9000);
// 3.数据转换
DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String element, Collector<String> out) {
String[] wordArr = element.split(",");
for (String word : wordArr) {
out.collect(word);
}
}
});
//DataStream 下边为DataStream子类
SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() {
@Override
public String map(String value) {
return value.toUpperCase();
}
});
// 4.数据输出
source.print();
// 5.执行程序
env.execute("flink-hello-world");
}
}

执行main方法,flink已经开始监听netcat的socket数据了,此时我们在cmd里输入一些字符串

然后观察控制台的输出

我们可以看到,来自socket的字符串数据已经成功按照预期进行了处理

有界数据流demo

我们首先在本地新建一个test.txt文件,随便输入一些字符串

然后将上个demo中的加载数据源那一行代码替换为

DataStreamSource<String> elementsSource = env.readTextFile("D:\\test.txt");

执行main方法

数据处理成功!

回复

沈振衣

12月24日

收藏不息,战斗不止

0 0