> 文章列表 > Flume日志采集流程(log->kafka->hdfs)

Flume日志采集流程(log->kafka->hdfs)

Flume日志采集流程(log->kafka->hdfs)

埋点数据:用户访问业务服务器如Nginx,利用log4j的技术,将客户端的埋点数据以日志的形式记录在文件

服务器日志文件——>HDFS文件

日志文件——>Flume(agent source(interceptor) channel)——>kafka topic ——>
Flume(agent source(interceptor) channel sink) ——> HDFS文件

环境:Hadoop+zookeeper+kafka+flume

准备好日志文件放入本地或hdfs中
日志文件appdemo.log

linux>vi appdemo.log
{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"4"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":"",
"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548955000"}{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"6"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":"",
"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548957000"}{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"8"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":"",
"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548959000"}{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"17"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":""
,"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548960000"}{"eventid":"appClickEvent","event":{"screen_id":"344","screen_name":"","title":"","element_id":"4"},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegistered":"",
"isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548961000"}{"eventid":"clickChannelEvent","event":{"belongTab":"6","channelIndex":"6","channelID":"6","channelName":""},"user":{"uid":"245498","account":"","email":"","phoneNbr":"18248667380","birthday":"","isRegist
ered":"","isLogin":"","addr":"","gender":"","phone":{"imei":"2881993463620531","mac":"2e-80-50-8e-39-a1-1e","imsi":"8616932323350461","osName":"macos","osVer":"9.0","androidId":"","resolution":"1024*768","deviceType":"360_V","deviceId":"81Kau4","uuid":"L3whyU7BgtLKEkvE"},"app":{"appid":"cn.kgc.mall","appVer":"2.0.1","release_ch":"纽扣助手","promotion_ch":"12"},"loc":{"areacode":210921102,"longtitude":121.56605311428365,"latitude":41.91452099352481,"carrier":"ISP02","netType":"WIFI","cid_sn":"463485993989","ip":"138.117.92.76"},"sessionId":"sid-99fe7648-d8e4-4cbe-86af-17b5b3c3a7fc"},"timestamp":"1575548963000"}{"eventid":"appviewEvent","event":{"screen_id":"681","screen_name":"","title":""},"user":{"uid":"611429","account":"","email":"","phoneNbr":"15669989777","birthday":"","isRegistered":"","isLogin":"","addr
":"","gender":"","phone":{"imei":"2478250569537801","mac":"a1-27-fd-a6-9a-0a-c9","imsi":"5597152087882061","osName":"windows","osVer":"9.0","androidId":"","resolution":"800*600","deviceType":"XIAOLAJIAO","deviceId":"2Ry864","uuid":"EFVu1NVmld9Dpykq"},"app":{"appid":"cn.kgc.mall","appVer":"1.0.0","release_ch":"应用宝","promotion_ch":"04"},"loc":{"areacode":130403205,"longtitude":114.43980636414214,"latitude":36.690319995072464,"carrier":"ISP07","netType":"N","cid_sn":"021803165366","ip":"42.24.217.87"},"sessionId":"sid-e760ea4b-5016-4eb3-bd6c-a9eaa2f27314"},"timestamp":"1575595490000"}{"eventid":"webStayEvent","event":{"pgid":"681","title":"","url":"http://www.kgcedu.cn/acc/pg681"},"user":{"uid":"611429","account":"","email":"","phoneNbr":"15669989777","birthday":"","isRegistered":"","
isLogin":"","addr":"","gender":"","phone":{"imei":"2478250569537801","mac":"a1-27-fd-a6-9a-0a-c9","imsi":"5597152087882061","osName":"windows","osVer":"9.0","androidId":"","resolution":"800*600","deviceType":"XIAOLAJIAO","deviceId":"2Ry864","uuid":"EFVu1NVmld9Dpykq"},"app":{"appid":"cn.kgc.mall","appVer":"1.0.0","release_ch":"应用宝","promotion_ch":"04"},"loc":{"areacode":130403205,"longtitude":114.43980636414214,"latitude":36.690319995072464,"carrier":"ISP07","netType":"N","cid_sn":"021803165366","ip":"42.24.217.87"},"sessionId":"sid-e760ea4b-5016-4eb3-bd6c-a9eaa2f27314"},"timestamp":"1575595491000"}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>test.flume</groupId><artifactId>flume-interceptor</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.69</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

idea:编写json连接器过滤出json数据

package flume.intercepter;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;//第一个拦截器
public class JsonInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {/*** agent 中传递的数据格式就是一个event:header body* 文件:一行就是一个event*/String log = new String(event.getBody(), StandardCharsets.UTF_8);/*** 每条记录是否是一个完整的jons记录*/if (JsonUtil.isJSONValidate(log)) {return event;} else {return null;}}@Overridepublic List<Event> intercept(List<Event> events) {Iterator<Event> iterator = events.iterator();while (iterator.hasNext()) {Event next = iterator.next();if (intercept(next) == null) {iterator.remove();}}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new JsonInterceptor();}@Overridepublic void configure(Context context) {}}
}

Idea:编写时间戳接口(用于flume采集到hdfs上时将时间设为数据时间而非hdfs的时间)

package flume.intercepter;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;//第二个拦截器 提前时间戳放入headers里
public class TimestampIntercepter implements Interceptor {List<Event> events=new ArrayList<>();@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {Map<String,String> headers=event.getHeaders();      //取头部String log=new String(event.getBody(), StandardCharsets.UTF_8); //取主体JSONObject jsonObject = JSON.parseObject(log); //解析里面的东西String ts = jsonObject.getString("timestamp");headers.put("timestamp", ts);    //放到里面//取出日志内的时间,否则落盘时可能是系统时间return event;}@Overridepublic List<Event> intercept(List<Event> list) {events.clear();for (Event event : list) {events.add(intercept(event));}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampIntercepter();}@Overridepublic void configure(Context context) {}}
}

编写flume运行文件
LogToKafka:

linux>vi log_to_kafka.conf
##为各组件命名
logToKafka.sources = logtokafkasource
logToKafka.channels = logkafkachannel#描述source
logToKafka.sources.logtokafkasource.type = TAILDIR
logToKafka.sources.logtokafkasource.filegroups = f1
logToKafka.sources.logtokafkasource.filegroups.f1 = /root/tmp_data/log/appdemo.*
logToKafka.sources.logtokafkasource.positionFile = /root/tmp_data/log/taildir_position.json
logToKafka.sources.logtokafkasource.interceptors =  i1
logToKafka.sources.logtokafkasource.interceptors.i1.type = flume.intercepter.JsonInterceptor$Builder
#filegroups 可以指定多个文件夹	positionFile 采集完后记录文件位置
#描述channel
logToKafka.channels.logkafkachannel.type = org.apache.flume.channel.kafka.KafkaChannel
logToKafka.channels.logkafkachannel.kafka.bootstrap.servers = 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
logToKafka.channels.logkafkachannel.kafka.topic = appdemo_log
logToKafka.channels.logkafkachannel.parseAsFlumeEvent = false#绑定source和channel以及sink和channel的关系
logToKafka.sources.logtokafkasource.channels = logkafkachannel

KafkaToHDFS:

linux>vi kafka_to_hdfs.conf
## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
a1.sources.r1.kafka.topics=appdemo_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = flume.intercepter.TimestampIntercepter$Builder## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/tmp_data/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /root/tmp_data/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /data/log/appdemo_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0## 控制输出文件是原生文件。
#a1.sinks.k1.hdfs.fileType = CompressedStream
#a1.sinks.k1.hdfs.codeC = lzop## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

注意:运行kafka_to_hdfs.conf 需要提前在本地创好文件夹

a1.channels.c1.checkpointDir = /root/tmp_data/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /root/tmp_data/flume/data/behavior1/这两项需要创建好文件夹linux>mkdir -p /root/tmp_data/flume/checkpoint/behavior1
linux>mkdir -p /root/tmp_data/flume/data/behavior1/

注意:idea拦截器编辑好后 打包放入flume/lib中

linux>mv flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/install/flume/lib

运行flume进行logToKafka

linux>bin/flume-ng agent --conf conf --conf-file log_to_kafka.conf --name logToKafka -Dflume.root.logger=INFO,console

运行flume进行KafkaToHdfs

linux>bin/flume-ng agent --conf conf --conf-file kafka_to_hdfs.conf --name a1 -Dflume.root.logger=INFO,console

报错1:运行完后异常 hdfs无文件,需要重置kafka topic 偏移量否则不读或缺数据

旧版kafka重置偏移量:
linux>kafka-streams-application-reset.sh --zookeeper 192.168.58.201:2181,192.168.58.202:2181,192.168.58.203:2181 --bootstrap-servers 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092--application-id flume --input-topics appdemo_log
旧版kafka查看topic的记录的条数
linux>bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092- --topic appdemo_log -time -1 --offsets 1新版kafka重置偏移量:
linux>kafka-consumer-groups.sh --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --group flume --reset-offsets --topic appdemo_log --to-earliest --execute
#查看组
kafka-consumer-groups.sh --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --list	
#查看所有Topic
kafka-topics.sh --list --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092#查看Topic
linux>kafka-topics.sh --bootstrap-server  192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --describe  --topic appdemo_log	#删除Topic
linux>kafka-topics.sh --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --delete --topic  appdemo_log	
查看topic的记录的条数
linux>kafka-run-class.sh  kafka.tools.GetOffsetShell --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --topic appdemo_log	 --time -1#消费消息(可能端口号与配置文件保持一致,或与发送端口保持一致)
linux>kafka-console-consumer.sh --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --topic appdemo_log	--from-beginning   #加了--from-beginning 重头消费所有的消息
linux>kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic appdemo_log        #不加--from-beginning 从最新的一条消息开始消费#查询topic的offset的范围offset的最小值
linux>kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092  -topic appdemo_log	 --time -2
#查询offset的最大值:
linux>bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 -topic appdemo_log  --time -1

报错2:java.lang.OutOfMemoryError: Java heap space 内存不足

###解决flume内存不足 OOM  在flume-env.sh
linux>vi flume-env.sh
export JAVA_OPTS="-Xms2048m -Xmx2048m -Xss256k -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"

结果: