conf:
# ex_trade.conf:外贸数据自kafka接入hdfs
# 配置负责人:褚天宇
# 配置时间: 2022-01-17
# 配置Agent ex_trade_agent各个组件的名称
ex_trade_agent.sources = r1
ex_trade_agent.sinks = k1
ex_trade_agent.channels = c1
# 配置Agent ex_trade_agent的source r1的属性
# 设置kafka源
ex_trade_agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# 设置kafka消费者组id (不能改动!改动会丢失已记录的offset)
ex_trade_agent.sources.r1.kafka.consumer.group.id = ex_trade_flume
ex_trade_agent.sources.r1.kafka.bootstrap.servers = 192.168.102.2:9092,192.168.102.3:9092,192.168.102.7:9092
ex_trade_agent.sources.r1.kafka.topics = ex_trade
# 一批写入 channel 的最大消息数
ex_trade_agent.sources.r1.batchSize = 2000
# 一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作
ex_trade_agent.sources.r1.batchDurationMillis = 2000
# 设置kafka-offset策略为从头消费
ex_trade_agent.sources.r1.kafka.consumer.auto.offset.reset = earliest
# 配置Agent ex_trade_agent的sink k1的属性
# 配置hdfs目标
ex_trade_agent.sinks.k1.type = hdfs
# 配置hdfs写入目录,每天一个文件夹,为之后hive表按日分区做准备
ex_trade_agent.sinks.k1.hdfs.path = /flume/ex_trade/date=%Y%m%d/
# 配置文件前缀
ex_trade_agent.sinks.k1.hdfs.filePrefix = ex_trade_
# 配置数据类型为原样输出
ex_trade_agent.sinks.k1.hdfs.fileType = DataStream
# 当前文件写入达到该值时间后触发滚动创建新文件,单位:秒,设置为1天防止产生小文件
ex_trade_agent.sinks.k1.hdfs.rollInterval = 86400
# 当前文件写入达到该大小后触发滚动创建新文件,单位:字节,设置为128M
ex_trade_agent.sinks.k1.hdfs.rollSize = 134217700
# 向 HDFS 写入内容时每次批量操作的 Event 数量
ex_trade_agent.sinks.k1.hdfs.batchSize = 2000
# 不根据 Event 数量来分割文件
ex_trade_agent.sinks.k1.hdfs.rollCount = 0
# 文件写入格式设置为 Text,否则 Impala或 Apache Hive 无法读取这些文件。
ex_trade_agent.sinks.k1.hdfs.writeFormat = Text
# 配置Agent ex_trade_agent的channel c1的属性,用来缓冲数据
ex_trade_agent.channels.c1.type = file
# 配置通道的检查点目录
ex_trade_agent.channels.c1.checkpointDir = /data/deploy/flume/checkpoint
# 配置通道的数据目录
ex_trade_agent.channels.c1.dataDirs = /data/deploy/flume/data
# 把source和sink绑定到channel上
ex_trade_agent.sources.r1.channels = c1
ex_trade_agent.sinks.k1.channel = c1
启动脚本:
#!/bin/bash
#ex_trade-hdfs34560.sh 34560为agent监控web端口号,下次再启动别的agent使用端口号为34561,依次累加
#负责人:褚天宇
#note:外贸数据接入hdfs的flume脚本
#参数:可选start|stop|restart
function start(){
echo "正在启动flume进程:ex_trade_agent......"
num=`ps -ef|grep java|grep ex_trade_agent|wc -l`
#判断是否有flume进程运行,如果无则运行执行nohup命令
if [ "$num" = "0" ] ;then
cd /data/deploy/flume/log/ex_trade_agent/
nohup flume-ng agent --conf /data/deploy/flume/conf/ --conf-file /data/deploy/flume/conf/ex_trade.conf --name ex_trade_agent -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34560 &
echo "启动ex_trade_agent成功!"
echo "日志路径: /data/deploy/flume/log/ex_trade_agent/nohup.out"
else
echo "ex_trade_agent进程已经存在,启动失败,请检查......"
exit 0
fi
}
function stop(){
echo "正在停止flume进程:ex_trade_agent......"
num=`ps -ef|grep java|grep ex_trade_agent|wc -l`
if [ "$num" != "0" ];then
ps -ef|grep java|grep ex_trade_agent|awk '{print $2;}'|xargs kill
echo "ex_trade_agent进程已经关闭......"
else
echo "ex_trade_agent进程未启动,无须停止......"
fi
}
function restart(){
stop
num=`ps -ef|grep java|grep ex_trade_agent|wc -l`
#stop完成之后,查找flume的进程数,判断进程数是否为0,如果不为0,则休眠5秒,再次查看,直到进程数为0
while [ "$num" != "0" ];do
sleep 5
num=`ps -ef|grep java|grep ex_trade_agent|wc -l`
done
echo "ex_trade_agent进程已经关闭,正在重新启动......"
start
echo "启动ex_trade_agent成功!"
}
#case 命令获取输入的参数,如果参数为start,执行start函数,如果参数为stop执行stop函数,如果参数为restart,执行restart函数
case "$1" in
"start")
start
;;
"stop")
stop
;;
"restart")
restart
;;
*)
;;
esac