SpringBoot使用Hbase
SpringBoot使用Hbase
文章目录
- SpringBoot使用Hbase
- 一,引入依赖
- 二,配置文件添加自己的属性
- 三,配置类注入HBASE配置
- 四,配置Hbase连接池
- 五,配置操作服务类
一,引入依赖
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.3.2</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency>
二,配置文件添加自己的属性
hbase:zookeeper:quorum: 10.xxx.xx.153,10.xxx.xx.154,10.xxx.xx.155property:clientPort: 2181master:port: 9001
三,配置类注入HBASE配置
package com.hbase.config;import org.apache.hadoop.hbase.HBaseConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/* @ClassName: HBaseConfig* @author: Leemon* @Description: TODO* @date: 2023/4/12 18:06* @version: 1.0*/
@Configuration
@RefreshScope
public class HBaseConfig {@Value("${hbase.zookeeper.quorum}")private String zookeeperQuorum;@Value("${hbase.zookeeper.property.clientPort}")private String clientPort;@Value("${hbase.master.port}")private String masterPort;@Beanpublic org.apache.hadoop.conf.Configuration hbaseConfiguration() {org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", zookeeperQuorum);conf.set("hbase.zookeeper.property.clientPort", clientPort);// 如果hbase是集群,这个必须加上// 这个ip和端口是在hadoop/mapred-site.xml配置文件配置的conf.set("hbase.master", zookeeperQuorum + ":" + masterPort);conf.set("hbase.client.keyvalue.maxsize", "20971520");conf = HBaseConfiguration.create(conf);return conf;}}
四,配置Hbase连接池
这里没有使用懒加载模式,减少启动后第一次访问时访问时间过长
package com.hbase.config;import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Enumeration;
import java.util.Vector;/* @ClassName: HbaseConnectionPool* @author: Leemon* @Description: TODO* @date: 2023/4/13 9:45* @version: 1.0*/
@Component
@Slf4j
public class HbaseConnectionPool {/* 连接池最大的大小*/private int nMaxConnections = 20;/* 连接池自动增加的大小*/private int nIncrConnectionAmount = 3;/* 连接池的初始大小*/private int nInitConnectionAmount = 3;/* 存放连接池中数据库连接的向量,初始时为null*/private Vector vcConnections = null;@Resourceprivate Configuration hbaseConfiguration;@PostConstructpublic void init() {try {vcConnections = new Vector();createConnections(nInitConnectionAmount);} catch (Exception e) {e.printStackTrace();}}public synchronized Connection getConnection() {// 确保连接池己被创建if (vcConnections == null) {// 连接池还没创建,则返回nullreturn null;}// 获得一个可用的数据库连接Connection conn = getFreeConnection();// 如果目前没有可以使用的连接,即所有的连接都在使用中while (conn == null) {// 等一会再试try {wait(250);} catch (InterruptedException e) {e.printStackTrace();}// 重新再试,直到获得可用的连接,如果getFreeConnection()返回的为null,则表明创建一批连接后也不可获得可用连接conn = getFreeConnection();}// 返回获得的可用的连接return conn;}/* 本函数从连接池向量 connections 中返回一个可用的的数据库连接,如果* 当前没有可用的数据库连接,本函数则根据 incrementalConnections 设置* 的值创建几个数据库连接,并放入连接池中。* 如果创建后,所有的连接仍都在使用中,则返回 null* @return* 返回一个可用的数据库连接*/private Connection getFreeConnection() {// 从连接池中获得一个可用的数据库连接Connection conn = findFreeConnection();if (conn == null) {// 如果目前连接池中没有可用的连接// 创建一些连接try {createConnections(nIncrConnectionAmount);} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();log.error("create new connection fail.", e);}// 重新从池中查找是否有可用连接conn = findFreeConnection();if (conn == null) {// 如果创建连接后仍获得不到可用的连接,则返回 nullreturn null;}}return conn;}/* 创建由 numConnections 指定数目的数据库连接 , 并把这些连接* 放入 connections 向量中* @param _nNumConnections 要创建的数据库连接的数目* @throws Exception*/private void createConnections(int _nNumConnections) throws Exception {// 循环创建指定数目的数据库连接for (int x = 0; x < _nNumConnections; x++) {// 是否连接池中的数据库连接的数量己经达到最大?最大值由类成员 maxConnections// 指出,如果 maxConnections 为 0 或负数,表示连接数量没有限制。// 如果连接数己经达到最大,即退出。if (this.nMaxConnections > 0 && this.vcConnections.size() >= this.nMaxConnections) {log.warn("已达到最大连接数,不能再增加连接");throw new Exception("已达到最大连接数"+ nMaxConnections+",不能再增加连接");}// 增加一个连接到连接池中(向量 connections 中)vcConnections.addElement(new ConnectionWrapper(newConnection()));log.info("HBase数据库连接己创建 ...... " + x);}}/* 查找池中所有的連接,查找一个可用的數據庫連接,* 如果没有可用的連結,返回null* @return* 返回一個可用的數據庫連接*/private Connection findFreeConnection() {Connection conn = null;ConnectionWrapper connWrapper = null;//獲得連接池向量中所有的對象Enumeration enumerate = vcConnections.elements();//遍歷所有的对象,看是否有可用的連接while (enumerate.hasMoreElements()) {connWrapper = (ConnectionWrapper) enumerate.nextElement();if (!connWrapper.isBusy()) {//如果此對象不忙,則獲得它的數據庫連接并把它設為忙conn = connWrapper.getConnection();connWrapper.setBusy(true);// 己经找到一个可用的連接,退出break;}}// 返回找到的可用連接return conn;}/*创建一个新的数据库连接并返回它* @return* 返回一个新创建的数据库连接*/private Connection newConnection() {/ hbase 连接 */Connection conn = null;// 创建一个数据库连接try {conn = ConnectionFactory.createConnection(hbaseConfiguration);} catch (IOException e) {log.error("创建HBase数据库连接失败!");e.printStackTrace();}// 返回创建的新的数据库连接return conn;}public synchronized void releaseConnection(Connection conn) {if (this.vcConnections == null) {log.info("连接池不存在,无法返回此连接到连接池中!!");} else {ConnectionWrapper connWrapper = null;Enumeration enumerate = this.vcConnections.elements();while(enumerate.hasMoreElements()) {connWrapper = (ConnectionWrapper) enumerate.nextElement();if (conn == connWrapper.getConnection()) {connWrapper.setBusy(false);break;}}}}class ConnectionWrapper {/* 数据库连接*/private Connection connection = null;/* 此连接是否正在使用的标志,默认没有正在使用*/private boolean busy = false;/* 构造函数,根据一个 Connection 构告一个 PooledConnection 对象*/public ConnectionWrapper(Connection connection) {this.connection = connection;}/* 返回此对象中的连接*/public Connection getConnection() {return connection;}/* 设置此对象的连接*/public void setConnection(Connection connection) {this.connection = connection;}/* 获得对象连接是否忙*/public boolean isBusy() {return busy;}/* 设置对象的连接正在忙*/public void setBusy(boolean busy) {this.busy = busy;}}}
init()方法实现在初始化连接池的时候创建默认数值的连接。
五,配置操作服务类
操作类接口 HbaseService.java
package com.hbase.service;import org.apache.hadoop.hbase.client.Scan;import java.util.Map;/* @InterfaceName: HbaseService* @author: Leemon* @Description: TODO* @date: 2023/4/12 18:11* @version: 1.0*/
public interface HbaseService {Map<String,Map<String,String>> getResultScanner(String tableName, String startRowKey, String stopRowKey);Map<String,String> getRowData(String tableName, String rowKey);Map<String,String> getFamilyValue(String tableName, String rowKey, String familyName);String getColumnValue(String tableName, String rowKey, String familyName, String columnName);Map<String,Map<String,String>> queryData(String tableName, Scan scan);}
接口实现类 HbaseServiceImpl.java
package com.hbase.service.impl;import com.hbase.config.HbaseConnectionPool;
import com.hbase.service.HbaseService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.*;/* @ClassName: HbaseServiceImpl* @author: Leemon* @Description: TODO* @date: 2023/4/12 18:13* @version: 1.0*/
@Slf4j
@Service
public class HbaseServiceImpl implements HbaseService {@Resourceprivate HbaseConnectionPool pool;@Overridepublic Map<String,Map<String,String>> getResultScanner(String tableName, String startRowKey, String stopRowKey){Scan scan = new Scan();if(StringUtils.isNotBlank(startRowKey) && StringUtils.isNotBlank(stopRowKey)){scan.withStartRow(Bytes.toBytes(startRowKey));scan.withStopRow(Bytes.toBytes(stopRowKey));}return this.queryData(tableName,scan);}public Map<String,Map<String,String>> getResultScannerPrefixFilter(String tableName, String prefix){Scan scan = new Scan();if(StringUtils.isNotBlank(prefix)){Filter filter = new PrefixFilter(Bytes.toBytes(prefix));scan.setFilter(filter);}return this.queryData(tableName,scan);}@Overridepublic Map<String,Map<String,String>> queryData(String tableName, Scan scan){Map<String,Map<String,String>> result = new HashMap<>();ResultScanner rs = null;// 获取表Table table= null;Connection connection = null;try {connection = pool.getConnection();table = getTable(connection, tableName);rs = table.getScanner(scan);for (Result r : rs) {//每一行数据Map<String,String> columnMap = new HashMap<>();String rowKey = null;for (Cell cell : r.listCells()) {if(rowKey == null){rowKey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength());}columnMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));}if(rowKey != null){result.put(rowKey,columnMap);}}}catch (IOException e) {log.error(MessageFormat.format("遍历查询指定表中的所有数据失败,tableName:{0}",tableName),e);}finally {close(null, rs, table, connection);}return result;}@Overridepublic Map<String,String> getRowData(String tableName, String rowKey){//返回的键值对Map<String,String> result = new HashMap<>();Get get = new Get(Bytes.toBytes(rowKey));// 获取表Table table= null;Connection connection = null;try {connection = pool.getConnection();table = getTable(connection, tableName);Result hTableResult = table.get(get);if (hTableResult != null && !hTableResult.isEmpty()) {for (Cell cell : hTableResult.listCells()) {result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));}// 某些应用场景需要插入到数据库的时间if (hTableResult.listCells().size() > 0) {result.put("Timestamp", hTableResult.listCells().get(0).getTimestamp() + "");}}}catch (IOException e) {log.error(MessageFormat.format("查询一行的数据失败,tableName:{0},rowKey:{1}",tableName,rowKey),e);}finally {close(null,null, table, connection);}return result;}@Overridepublic Map<String,String> getFamilyValue(String tableName, String rowKey, String familyName){//返回的键值对Map<String,String> result = new HashMap<>(2);Get get = new Get(Bytes.toBytes(rowKey));get.addFamily(Bytes.toBytes(familyName));// 获取表Table table= null;Connection connection = null;try {connection = pool.getConnection();table = getTable(connection, tableName);Result getResult = table.get(get);if (getResult != null && !getResult.isEmpty()) {for (Cell cell : getResult.listCells()) {result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));}}} catch (IOException e) {log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2}", tableName, rowKey, familyName), e);}finally {close(null,null, table, connection);}return result;}@Overridepublic String getColumnValue(String tableName, String rowKey, String familyName, String columnName){String str = null;Get get = new Get(Bytes.toBytes(rowKey));// 获取表Table table= null;Connection connection = null;try {connection = pool.getConnection();table = getTable(connection, tableName);Result result = table.get(get);if (result != null && !result.isEmpty()) {Cell cell = result.getColumnLatestCell(Bytes.toBytes(familyName), Bytes.toBytes(columnName));if(cell != null){str = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());}}} catch (IOException e) {log.error(MessageFormat.format("查询指定单元格的数据失败,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}",tableName,rowKey,familyName,columnName),e);}finally {close(null,null, table, connection);}return str;}private Table getTable(Connection connection, String tableName) throws IOException {Table table = connection.getTable(TableName.valueOf(tableName));return table;}private void close(Admin admin, ResultScanner rs, Table table, Connection connection){if(admin != null){try {admin.close();} catch (IOException e) {log.error("关闭Admin失败",e);}}if(rs != null){rs.close();}if(table != null){try {table.close();} catch (IOException e) {log.error("关闭Table失败",e);}}// 释放连接if (Objects.nonNull(connection)) {pool.releaseConnection(connection);}}}
ok,现在就可以操作使用了。
以前都是在非Spring环境下使用Hbase的,一开始会出现:当服务使用时间过久,某些会使用hbase的接口调用次数过多的时候,会报【已超过最大的连接数】,只能每一次调用接口后最后一行加上释放连接。(以前的做法每次调用都要在代码里手动获取一个连接)
这次将释放连接都集成在操作服务类的实现方法中,避免了开发接口可能遗漏的错误,可能不会再出现这个问题。