> 文章列表 > springboot集成hadoop3.2.4HDFS

springboot集成hadoop3.2.4HDFS

springboot集成hadoop3.2.4HDFS


前言

记录springboot集成hadoop3.2.4版本,并且调用HDFS的相关接口,这里就不展示springboot工程的建立了,这个你们自己去建工程很多教程。


一、springboot配置文件修改

1.1 pom文件修改

 <!-- hadoop依赖 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>javax.servlet</groupId><artifactId>servlet-api</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>javax.servlet</groupId><artifactId>servlet-api</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>javax.servlet</groupId><artifactId>servlet-api</artifactId></exclusion></exclusions></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.1.0</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.10</version></dependency>

完整pom配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.3</version></parent><groupId>com.hadoop</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><name>hadoop</name><description>Demo project for Spring Boot</description><properties><java.version>8</java.version><hadoop.version>3.2.4</hadoop.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--Lombok简化代码--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- hadoop依赖 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>javax.servlet</groupId><artifactId>servlet-api</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>javax.servlet</groupId><artifactId>servlet-api</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>javax.servlet</groupId><artifactId>servlet-api</artifactId></exclusion></exclusions></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.1.0</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.10</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.5.3</version></plugin></plugins></build></project>

1.2 properties文件修改

加入以下配置

hadoop.name-node: hdfs://192.168.184.129:8020
hadoop.namespace: /mydir

name-node是这个服务的地址,可以在hadoop的配置文件中找,或者直接看hadoop集群namenode网页也可以看到端口号。
我的集群的地址是以下这个:

http://192.168.184.129:9870/

springboot集成hadoop3.2.4HDFS
namespace是在hdfs上文件的地址,就是写文件要写到这个目录下面去。

二、springboot相关类配置类

2.1 新建config类

代码如下:

package com.hadoop.demo.config;import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.net.URI;@Configuration
@ConditionalOnProperty(name="hadoop.name-node")
@Slf4j
public class HadoopConfig {@Value("${hadoop.name-node}")private String nameNode;/*** Configuration conf=new Configuration();* 创建一个Configuration对象时,其构造方法会默认加载hadoop中的两个配置文件,* 分别是hdfs-site.xml以及core-site.xml,这两个文件中会有访问hdfs所需的参数值,* 主要是fs.default.name,指定了hdfs的地址,有了这个地址客户端就可以通过这个地址访问hdfs了。* 即可理解为configuration就是hadoop中的配置信息。* @return*/@Bean("fileSystem")public FileSystem createFs() throws Exception{//读取配置文件org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();conf.set("fs.defalutFS", nameNode);conf.set("dfs.replication", "1");FileSystem fs = null;//conf.set("fs.defaultFS","hdfs://ns1");//指定访问hdfs的客户端身份//fs = FileSystem.get(new URI(nameNode), conf, "root");// 文件系统// 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统try {URI uri = new URI(nameNode.trim());fs = FileSystem.get(uri,conf,"root");} catch (Exception e) {log.error("", e);}System.out.println("fs.defaultFS: "+conf.get("fs.defaultFS"));return  fs;}
}

2.2 新建hdfs操作类

代码如下:

package com.hadoop.demo.config;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.io.IOException;@Component
@ConditionalOnBean(FileSystem.class)
@Slf4j
public class HadoopTemplate {@Autowiredprivate FileSystem fileSystem;@Value("${hadoop.name-node}")private String nameNode;@Value("${hadoop.namespace:/}")private String nameSpace;@PostConstructpublic void init(){existDir(nameSpace,true);}public void uploadFile(String srcFile){copyFileToHDFS(false,true,srcFile,nameSpace);}public void uploadFile(boolean del,String srcFile){copyFileToHDFS(del,true,srcFile,nameSpace);}public void uploadFile(String srcFile,String destPath){copyFileToHDFS(false,true,srcFile,destPath);}public void uploadFile(boolean del,String srcFile,String destPath){copyFileToHDFS(del,true,srcFile,destPath);}public void delFile(String fileName){rmdir(nameSpace,fileName) ;}public void delDir(String path){nameSpace = nameSpace + "/" +path;rmdir(path,null) ;}public void download(String fileName,String savePath){getFile(nameSpace+"/"+fileName,savePath);}/*** 创建目录* @param filePath* @param create* @return*/public boolean existDir(String filePath, boolean create){boolean flag = false;if(StringUtils.isEmpty(filePath)){throw new IllegalArgumentException("filePath不能为空");}try{Path path = new Path(filePath);if (create){if (!fileSystem.exists(path)){fileSystem.mkdirs(path);}}if (fileSystem.isDirectory(path)){flag = true;}}catch (Exception e){log.error("", e);}return flag;}/*** 文件上传至 HDFS* @param delSrc       指是否删除源文件,true为删除,默认为false* @param overwrite* @param srcFile      源文件,上传文件路径* @param destPath     hdfs的目的路径*/public  void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) {// 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txtPath srcPath = new Path(srcFile);// 目的路径if(StringUtils.isNotBlank(nameNode)){destPath = nameNode + destPath;}Path dstPath = new Path(destPath);// 实现文件上传try {// 获取FileSystem对象fileSystem.copyFromLocalFile(srcPath, dstPath);fileSystem.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);//释放资源//    fileSystem.close();} catch (IOException e) {log.error("", e);}}/*** 删除文件或者文件目录** @param path*/public void rmdir(String path,String fileName) {try {// 返回FileSystem对象if(StringUtils.isNotBlank(nameNode)){path = nameNode + path;}if(StringUtils.isNotBlank(fileName)){path =  path + "/" +fileName;}// 删除文件或者文件目录  delete(Path f) 此方法已经弃用fileSystem.delete(new Path(path),true);} catch (IllegalArgumentException | IOException e) {log.error("", e);}}/*** 从 HDFS 下载文件** @param hdfsFile* @param destPath 文件下载后,存放地址*/public void getFile(String hdfsFile,String destPath) {// 源文件路径if(StringUtils.isNotBlank(nameNode)){hdfsFile = nameNode + hdfsFile;}Path hdfsPath = new Path(hdfsFile);Path dstPath = new Path(destPath);try {// 下载hdfs上的文件fileSystem.copyToLocalFile(hdfsPath, dstPath);// 释放资源// fs.close();} catch (IOException e) {log.error("", e);}}public String getNameSpace(){return nameSpace;}}

2.3 新建HDFSutil类

package com.hadoop.demo.util;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;import java.io.IOException;
import java.net.URI;/*** hdfs基本操作*/
@Slf4j
public class HdfsUtil {/*** 获取文件系统* @param hdfsUri  nameNode地址 如"hdfs://10.10.1.142:9000"* @return*/public static FileSystem getFileSystem(String hdfsUri) {//读取配置文件Configuration conf = new Configuration();// 文件系统FileSystem fs = null;if(StringUtils.isBlank(hdfsUri)){// 返回默认文件系统  如果在 Hadoop集群下运行,使用此种方法可直接获取默认文件系统try {fs = FileSystem.get(conf);} catch (IOException e) {log.error("", e);}}else{// 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统try {URI uri = new URI(hdfsUri.trim());fs = FileSystem.get(uri,conf);} catch (Exception e) {log.error("", e);}}return fs;}/*** 创建文件目录** @param hdfsUri* @param path*/public static void mkdir(String hdfsUri, String path) {try {// 获取文件系统FileSystem fs = getFileSystem(hdfsUri);if(StringUtils.isNotBlank(hdfsUri)){path = hdfsUri + path;}// 创建目录fs.mkdirs(new Path(path));//释放资源fs.close();} catch (IllegalArgumentException | IOException e) {log.error("", e);}}/*** 删除文件或者文件目录** @param path*/public static void rmdir(String hdfsUri,String path) {try {// 返回FileSystem对象FileSystem fs = getFileSystem(hdfsUri);if(StringUtils.isNotBlank(hdfsUri)){path = hdfsUri + path;}// 删除文件或者文件目录  delete(Path f) 此方法已经弃用fs.delete(new Path(path),true);// 释放资源fs.close();} catch (IllegalArgumentException | IOException e) {log.error("", e);}}/*** 根据filter获取目录下的文件** @param path* @param pathFilter* @return String[]*/public static String[] listFile(String hdfsUri, String path,PathFilter pathFilter) {String[] files = new String[0];try {// 返回FileSystem对象FileSystem fs = getFileSystem(hdfsUri);if(StringUtils.isNotBlank(hdfsUri)){path = hdfsUri + path;}FileStatus[] status;if(pathFilter != null){// 根据filter列出目录内容status = fs.listStatus(new Path(path),pathFilter);}else{// 列出目录内容status = fs.listStatus(new Path(path));}// 获取目录下的所有文件路径Path[] listedPaths = FileUtil.stat2Paths(status);// 转换String[]if (listedPaths != null && listedPaths.length > 0){files = new String[listedPaths.length];for (int i = 0; i < files.length; i++){files[i] = listedPaths[i].toString();}}// 释放资源fs.close();} catch (IllegalArgumentException | IOException e) {log.error("", e);}return files;}/*** 文件上传至 HDFS* @param hdfsUri* @param delSrc       指是否删除源文件,true为删除,默认为false* @param overwrite* @param srcFile      源文件,上传文件路径* @param destPath     hdfs的目的路径*/public static void copyFileToHDFS(String hdfsUri,boolean delSrc, boolean overwrite,String srcFile,String destPath) {// 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txtPath srcPath = new Path(srcFile);// 目的路径if(StringUtils.isNotBlank(hdfsUri)){destPath = hdfsUri + destPath;}Path dstPath = new Path(destPath);// 实现文件上传try {// 获取FileSystem对象FileSystem fs = getFileSystem(hdfsUri);fs.copyFromLocalFile(srcPath, dstPath);fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);//释放资源fs.close();} catch (IOException e) {log.error("", e);}}/*** 从 HDFS 下载文件** @param srcFile* @param destPath 文件下载后,存放地址*/public static void getFile(String hdfsUri, String srcFile,String destPath) {// 源文件路径if(StringUtils.isNotBlank(hdfsUri)){srcFile = hdfsUri + srcFile;}Path srcPath = new Path(srcFile);Path dstPath = new Path(destPath);try {// 获取FileSystem对象FileSystem fs = getFileSystem(hdfsUri);// 下载hdfs上的文件fs.copyToLocalFile(srcPath, dstPath);// 释放资源fs.close();} catch (IOException e) {log.error("", e);}}/*** 获取 HDFS 集群节点信息** @return DatanodeInfo[]*/public static DatanodeInfo[] getHDFSNodes(String hdfsUri) {// 获取所有节点DatanodeInfo[] dataNodeStats = new DatanodeInfo[0];try {// 返回FileSystem对象FileSystem fs = getFileSystem(hdfsUri);// 获取分布式文件系统DistributedFileSystem hdfs = (DistributedFileSystem)fs;dataNodeStats = hdfs.getDataNodeStats();} catch (IOException e) {log.error("", e);}return dataNodeStats;}/*** 查找某个文件在 HDFS集群的位置** @param filePath* @return BlockLocation[]*/public static BlockLocation[] getFileBlockLocations(String hdfsUri, String filePath) {// 文件路径if(StringUtils.isNotBlank(hdfsUri)){filePath = hdfsUri + filePath;}Path path = new Path(filePath);// 文件块位置列表BlockLocation[] blkLocations = new BlockLocation[0];try {// 返回FileSystem对象FileSystem fs = getFileSystem(hdfsUri);// 获取文件目录FileStatus filestatus = fs.getFileStatus(path);//获取文件块位置列表blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());} catch (IOException e) {log.error("", e);}return blkLocations;}/*** 判断目录是否存在* @param hdfsUri* @param filePath* @param create* @return*/public boolean existDir(String hdfsUri,String filePath, boolean create){boolean flag = false;if (StringUtils.isEmpty(filePath)){return flag;}try{Path path = new Path(filePath);// FileSystem对象FileSystem fs = getFileSystem(hdfsUri);if (create){if (!fs.exists(path)){fs.mkdirs(path);}}if (fs.isDirectory(path)){flag = true;}}catch (Exception e){log.error("", e);}return flag;}
}

2.4 新建controller类

package com.hadoop.demo.control;import com.hadoop.demo.config.HadoopTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/hdfs")
@RestController
public class HdfsController {@Autowiredprivate HadoopTemplate hadoopTemplate;/*** 将本地文件srcFile,上传到hdfs* @param srcFile* @return*/@RequestMapping("/upload")public String upload(@RequestParam String srcFile){hadoopTemplate.uploadFile(srcFile);return "upload";}@RequestMapping("/delFile")public String del(@RequestParam String fileName){hadoopTemplate.delFile(fileName);return "delFile";}@RequestMapping("/download")public String download(@RequestParam String fileName,@RequestParam String savePath){hadoopTemplate.download(fileName,savePath);return "download";}
}

三、遇到的问题

项目启动后是从windows调用linux集群,启动一定会报错,如果没有配置windows的环境。

3.1 windows环境配置

报错如下

java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblemsat org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547) ~[hadoop-common-3.2.4.jar:na]

报错的原因是缺少了hadoop的环境配置。要做以下的配置。

3.2 相关hadoop配置下载

https://gitee.com/nkuhyx/winutils.git
下载地址在上面,我这里的hadoop版本是3.2.4,这里我选择的是版本接近的3.2.1
springboot集成hadoop3.2.4HDFS

3.3 修改电脑环境变量

我本地下载后安装到
D:\\javaTools\\hadoopwindowsclient\\hadoop-3.2.1
添加系统变量HADOOP_HOME

D:\\javaTools\\hadoopwindowsclient\\hadoop-3.2.1

添加到path

%HADOOP_HOME%\\bin

springboot集成hadoop3.2.4HDFS

3.4 重启电脑

配置好后重启电脑或者使用dos命令刷新环境变量,我这里直接重启电脑了,就懒得去弄命令了。

四、测试

4.1 调用上传接口

上传文件
本地D盘新建了一个测试文件,内容如下
springboot集成hadoop3.2.4HDFS
调用上传接口
srcfile为你本地的文件路径。

http://localhost:8080/hdfs/upload?srcFile=D:\\test.txt

结果:
点击namenode进来可以看到文件路径。
springboot集成hadoop3.2.4HDFS
点开这个文件
springboot集成hadoop3.2.4HDFS
可以看到文件已经上传到hdfs了,这里需要注意一个细节。
文件的格式必须是utf-8的如果不是的话,上传中文里面的文件是乱码,这个需要注意下。

4.2 下载文件

这里的filename是下载文件的路径。

http://localhost:8080/hdfs/download?fileName=test.txt&savePath=D:\\Download

下载到d盘下,结果如下
springboot集成hadoop3.2.4HDFS
打开内容和上传的一致,说明下载成功。

4.3 删除文件

http://localhost:8080/hdfs/delFile?fileName=test.txt

删除后重新查看namenode网址
springboot集成hadoop3.2.4HDFS
可以看到文件已经删除了。

总结

这里展示了springboot集成hadoopHDFS的相关操作以及遇到的问题解决,如果对你有帮助点个赞吧。