Flink系列-5、Flink DataSet API介绍
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
大数据系列文章目录
官方网址:https://flink.apache.org/
学习资料:https://flink-learning.org.cn/
目录
- Flink DataSet API编程模型
- Flink DataSet API编程基本步骤
- 输入数据集Data Source
-
- 基于本地集合的source(Collection-based-source)
-
- 代码
- 在代码中指定并行度
- 基于文件的source(File-based-source)
-
- 读取本地文件
- 读取HDFS数据
- 读取CSV数据
- 基于文件的source(遍历目录)
- 读取压缩文件
Flink DataSet API编程模型
Flink DataSet API编程基本步骤
- 获取执行环境(ExecutionEnvironment)
- 加载/创建初始数据集
- 对数据集进行各种转换操作(生成新的DataSet)
- 指定将计算的结果输出
- 提交任务(可选)
输入数据集Data Source
Data Sources 是什么呢?就字面意思其实就可以知道 数据来源 。
Flink 做为一款流式计算框架,它可用来做批处理,也可以用来做流处理,这个 Data Sources 就是数据的来源地。
flink在批处理中常见的source主要有两大类。
- 基于本地集合的source(Collection-based-source)
- 基于文件的source(File-based-source)
基于本地集合的source(Collection-based-source)
在flink最常见的创建DataSet方式有三种:
- 使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。
注意:类型要一致,不一致可以用Object接收,但是使用会报错,比如:env.fromElements(“haha”, 1);
源码注释中有写:
- 使用env.fromCollection(),这种方式支持多种Collection的具体类型,如List,Set,Queue
- 使用env.generateSequence()方法创建基于Sequence的DataSet
代码
package batch.source;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;/* @author lwh* @date 2023/4/11* @description/
public class TestSource {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 1. 从元素中获取DataSource<String> source1 = env.fromElements("haha", "heihei");source1.print();// 2. 从元素中获取,但是元素类型不同
// DataSource<Object> source2 = env.fromElements("haha", 1);
// source2.print();// 3. 从List中获取List<String> list = new ArrayList<>();list.add("haha");list.add("heihei");DataSource<String> listSource = env.fromCollection(list);listSource.print();// 4. 从Set中获取Set<String> set = new HashSet<>();set.add("lalala");set.add("guaguagua");DataSource<String> setSource = env.fromCollection(set);setSource.print();// 5. 从队列中获取Queue<String> queue = new ArrayBlockingQueue<>(10);queue.add("Spark");queue.add("Hadoop");DataSource<String> queueSource = env.fromCollection(queue);queueSource.print();// 6. 生成序列DataSource<Long> seqSource = env.generateSequence(1, 10);seqSource.print();}
}
在代码中指定并行度
指定全局并行度:
env.setParallelism(12);
获得全局并行度:
env.getParallelism();
指定算子设置并行度:
获得指定算子的并行度:
eventSource.getParallelism();
基于文件的source(File-based-source)
Flink支持直接从外部文件存储系统中读取文件的方式来创建Source数据源,Flink支持的方式有以下几种:
- 读取本地文件数据
- 读取HDFS文件数据
- 读取CSV文件数据
- 读取压缩文件
- 遍历目录
下面分别演示每个数据源的加载方式。
读取本地文件
Flink的批处理可以直接通过readTextFile()方法读取文件来创建数据源,方法如下:
package batch.source;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;/* @author lwh* @date 2023/4/11* @description/
public class BatchFromFileDemo {public static void main(String[] args) throws Exception {//1:初始化运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2:读取本地文件数据源DataSource<String> localLines = env.readTextFile("./data/input/wordcount.txt");//3:打印数据localLines.print();}
}
读取HDFS数据
package batch.source;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;/* @author lwh* @date 2023/4/11* @description/
public class HDFSFileSource {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// Read HDFS FileDataSource<String> hdfsSource = env.readTextFile("hdfs://node1:8020/input/license.txt");hdfsSource.print();}
}
读取CSV数据
package entity;/* @author lwh* @date 2023/4/11* @description/
public class Student {private Integer id;private String name;private Integer sort;private Double score;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getSort() {return sort;}public void setSort(Integer sort) {this.sort = sort;}public Double getScore() {return score;}public void setScore(Double score) {this.score = score;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\\'' +", sort=" + sort +", score=" + score +'}';}
}
package batch.source;import entity.Student;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.operators.DataSource;/* @author lwh* @date 2023/4/11* @description/
public class CSVFileSource {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();/* 参数说明:* fieldDelimiter设置分隔符,默认的是","* ignoreFirstLine忽略第一行* includeFields是设置选取哪几列,我这里是第四列不选取的。* pojoType和后面字段名,就是对应列。字段名是不需要对应原始文件header字段名,* 但必须与POJO例如UserInfo.class里的字段名一一对应*/CsvReader csvReader = env.readCsvFile("data/input/score.csv");DataSource<Student> csvSource = csvReader.fieldDelimiter(",").ignoreFirstLine().includeFields(true, true, true, true).pojoType(Student.class, "id", "name", "sort", "score");csvSource.print();}
}
基于文件的source(遍历目录)
flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。
对于从文件中读取数据,当读取的多个文件夹的时候,嵌套的文件默认是不会被读取的,只会读取第一个文件,其他的都会被忽略。所以我们需要使用recursive.file.enumeration进行递归读取
package batch.source;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;/* @author lwh* @date 2023/4/11* @description 递归读取目录内的文件/
public class RecursiveFileReadSource {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 生成配置参数Configuration param = new Configuration();param.setBoolean("recursive.file.enumeration", true);// 读取目录配合递归DataSource<String> source = env.readTextFile("./data").withParameters(param);source.print();}
}
读取压缩文件
对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。
压缩格式 | 扩展名 |
---|---|
DEFLATE | deflate |
GZIP | .gz,.gzip |
Bzip2 | .bz2 |
XZ | .xz |
package batch.source;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;/* @author lwh* @date 2023/4/11* @description 读取压缩文件:gz,xz/
public class CompressFileSource {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 读取压缩文件DataSource<String> gzSource = env.readTextFile("data/input/test.gz");gzSource.print();}
}
注意:读取压缩文件,不能并行处理,因此加载解压的时间会稍微有点长。