> 文章列表 > Flink系列-5、Flink DataSet API介绍

Flink系列-5、Flink DataSet API介绍

Flink系列-5、Flink DataSet API介绍

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

数据系列文章目录

官方网址:https://flink.apache.org/

学习资料:https://flink-learning.org.cn/
Flink系列-5、Flink DataSet API介绍

目录

  • Flink DataSet API编程模型
  • Flink DataSet API编程基本步骤
  • 输入数据集Data Source
    • 基于本地集合的source(Collection-based-source)
      • 代码
    • 在代码中指定并行度
    • 基于文件的source(File-based-source)
      • 读取本地文件
      • 读取HDFS数据
      • 读取CSV数据
      • 基于文件的source(遍历目录)
      • 读取压缩文件

Flink DataSet API编程模型

Flink系列-5、Flink DataSet API介绍

Flink DataSet API编程基本步骤

  • 获取执行环境(ExecutionEnvironment)
  • 加载/创建初始数据集
  • 对数据集进行各种转换操作(生成新的DataSet)
  • 指定将计算的结果输出
  • 提交任务(可选)

输入数据集Data Source

Data Sources 是什么呢?就字面意思其实就可以知道 数据来源
Flink 做为一款流式计算框架,它可用来做批处理,也可以用来做流处理,这个 Data Sources 就是数据的来源地。
flink在批处理中常见的source主要有两大类。

  • 基于本地集合的source(Collection-based-source)
  • 基于文件的source(File-based-source)

基于本地集合的source(Collection-based-source)

在flink最常见的创建DataSet方式有三种:

  • 使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。
    注意:类型要一致,不一致可以用Object接收,但是使用会报错,比如:env.fromElements(“haha”, 1);
    Flink系列-5、Flink DataSet API介绍
    源码注释中有写:
    Flink系列-5、Flink DataSet API介绍
  • 使用env.fromCollection(),这种方式支持多种Collection的具体类型,如List,Set,Queue
  • 使用env.generateSequence()方法创建基于Sequence的DataSet

代码

package batch.source;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;/* @author lwh* @date 2023/4/11* @description/
public class TestSource {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 1. 从元素中获取DataSource<String> source1 = env.fromElements("haha", "heihei");source1.print();// 2. 从元素中获取,但是元素类型不同
//        DataSource<Object> source2 = env.fromElements("haha", 1);
//        source2.print();// 3. 从List中获取List<String> list = new ArrayList<>();list.add("haha");list.add("heihei");DataSource<String> listSource = env.fromCollection(list);listSource.print();// 4. 从Set中获取Set<String> set = new HashSet<>();set.add("lalala");set.add("guaguagua");DataSource<String> setSource = env.fromCollection(set);setSource.print();// 5. 从队列中获取Queue<String> queue = new ArrayBlockingQueue<>(10);queue.add("Spark");queue.add("Hadoop");DataSource<String> queueSource = env.fromCollection(queue);queueSource.print();// 6. 生成序列DataSource<Long> seqSource = env.generateSequence(1, 10);seqSource.print();}
}

在代码中指定并行度

指定全局并行度
env.setParallelism(12);
获得全局并行度
env.getParallelism();

指定算子设置并行度

Flink系列-5、Flink DataSet API介绍

获得指定算子的并行度
eventSource.getParallelism();

基于文件的source(File-based-source)

Flink支持直接从外部文件存储系统中读取文件的方式来创建Source数据源,Flink支持的方式有以下几种:

  • 读取本地文件数据
  • 读取HDFS文件数据
  • 读取CSV文件数据
  • 读取压缩文件
  • 遍历目录

下面分别演示每个数据源的加载方式。

读取本地文件

Flink的批处理可以直接通过readTextFile()方法读取文件来创建数据源,方法如下:

package batch.source;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;/* @author lwh* @date 2023/4/11* @description/
public class BatchFromFileDemo {public static void main(String[] args) throws Exception {//1:初始化运行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2:读取本地文件数据源DataSource<String> localLines = env.readTextFile("./data/input/wordcount.txt");//3:打印数据localLines.print();}
}

读取HDFS数据

package batch.source;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;/* @author lwh* @date 2023/4/11* @description/
public class HDFSFileSource {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// Read HDFS FileDataSource<String> hdfsSource = env.readTextFile("hdfs://node1:8020/input/license.txt");hdfsSource.print();}
}

读取CSV数据

package entity;/* @author lwh* @date 2023/4/11* @description/
public class Student {private Integer id;private String name;private Integer sort;private Double score;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getSort() {return sort;}public void setSort(Integer sort) {this.sort = sort;}public Double getScore() {return score;}public void setScore(Double score) {this.score = score;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\\'' +", sort=" + sort +", score=" + score +'}';}
}
package batch.source;import entity.Student;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.operators.DataSource;/* @author lwh* @date 2023/4/11* @description/
public class CSVFileSource {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();/* 参数说明:* fieldDelimiter设置分隔符,默认的是","* ignoreFirstLine忽略第一行* includeFields是设置选取哪几列,我这里是第四列不选取的。* pojoType和后面字段名,就是对应列。字段名是不需要对应原始文件header字段名,* 但必须与POJO例如UserInfo.class里的字段名一一对应*/CsvReader csvReader = env.readCsvFile("data/input/score.csv");DataSource<Student> csvSource = csvReader.fieldDelimiter(",").ignoreFirstLine().includeFields(true, true, true, true).pojoType(Student.class, "id", "name", "sort", "score");csvSource.print();}
}

Flink系列-5、Flink DataSet API介绍

基于文件的source(遍历目录)

flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。
对于从文件中读取数据,当读取的多个文件夹的时候,嵌套的文件默认是不会被读取的,只会读取第一个文件,其他的都会被忽略。所以我们需要使用recursive.file.enumeration进行递归读取

package batch.source;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;/* @author lwh* @date 2023/4/11* @description 递归读取目录内的文件/
public class RecursiveFileReadSource {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 生成配置参数Configuration param = new Configuration();param.setBoolean("recursive.file.enumeration", true);// 读取目录配合递归DataSource<String> source = env.readTextFile("./data").withParameters(param);source.print();}
}

Flink系列-5、Flink DataSet API介绍

读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

压缩格式 扩展名
DEFLATE deflate
GZIP .gz,.gzip
Bzip2 .bz2
XZ .xz
package batch.source;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;/* @author lwh* @date 2023/4/11* @description 读取压缩文件:gz,xz/
public class CompressFileSource {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 读取压缩文件DataSource<String> gzSource = env.readTextFile("data/input/test.gz");gzSource.print();}
}

注意:读取压缩文件,不能并行处理,因此加载解压的时间会稍微有点长。