Flume内存溢出卡死
问题:agent启动后跑到一半报错卡死 解决:修改flume_ng启动脚本中jvm参数: vi /opt/cloudera/parcels/CDH/lib/flume-ng/bin/flume-ng 把 JAVA_OPTS="-Xmx20m" 改为 JAVA_OPTS="-Xmx2048m" 重启agent,顺畅running
问题:agent启动后跑到一半报错卡死 解决:修改flume_ng启动脚本中jvm参数: vi /opt/cloudera/parcels/CDH/lib/flume-ng/bin/flume-ng 把 JAVA_OPTS="-Xmx20m" 改为 JAVA_OPTS="-Xmx2048m" 重启agent,顺畅running
问题描述:flume采集到hdfs上的文件一直不关闭,有tmp后缀,hive读不到 原配置: # 当前文件写入达到该值时间后触发滚动创建新文件,单位:秒,设置为24小时防止产生小文件 ex_trade_agent.sinks.k1.hdfs.rollInterval = 86400 # 当前文件写入达到该大小后触发滚动创建新文件,单位:字节,设置为128M ex_trade_agent.sinks.k1.hdfs.rollSize = 134217700 # 向 HDFS 写入内容时每次批量操作的 Event 数量 ex_trade_agent.sinks.k1.hdfs.batchSize = 2000 # 不根据 Event 数量来分割文件 ex_trade_agent.sinks.k1.hdfs.rollCount = 0 可以看到只按照128m和24小时来判断是否写新文件,如果两者都不满足那就不关闭临时文件 解决方案: # 当前文件写入达到该值时间后触发滚动创建新文件,单位:秒,设置为4小时防止产生小文件 ex_trade_agent.sinks.k1.hdfs.rollInterval = 14400 # 当非活动文件超过4小时,关闭该文件 ex_trade_agent.sinks.k1.hdfs.idleTimeout = 14400
conf: # ex_trade.conf:外贸数据自kafka接入hdfs # 配置负责人:褚天宇 # 配置时间: 2022-01-17 # 配置Agent ex_trade_agent各个组件的名称 ex_trade_agent.sources = r1 ex_trade_agent.sinks = k1 ex_trade_agent.channels = c1 # 配置Agent ex_trade_agent的source r1的属性 # 设置kafka源 ex_trade_agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource # 设置kafka消费者组id (不能改动!改动会丢失已记录的offset) ex_trade_agent.sources.r1.kafka.consumer.group.id = ex_trade_flume ex_trade_agent.sources.r1.kafka.bootstrap.servers = 192.168.102.2:9092,192.168.102.3:9092,192.168.102.7:9092 ex_trade_agent.sources.r1.kafka.topics = ex_trade # 一批写入 channel 的最大消息数 ex_trade_agent.sources.r1.batchSize = 2000 # 一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作 ex_trade_agent.sources.r1.batchDurationMillis = 2000 # 设置kafka-offset策略为从头消费 ex_trade_agent.sources.r1.kafka.consumer.auto.offset.reset = earliest # 配置Agent ex_trade_agent的sink k1的属性 # 配置hdfs目标 ex_trade_agent.sinks.k1.type = hdfs # 配置hdfs写入目录,每天一个文件夹,为之后hive表按日分区做准备 ex_trade_agent.sinks.k1.hdfs.path = /flume/ex_trade/date=%Y%m%d/ # 配置文件前缀 ex_trade_agent.sinks.k1.hdfs.filePrefix = ex_trade_ # 配置数据类型为原样输出 ex_trade_agent.sinks.k1.hdfs.fileType = DataStream # 当前文件写入达到该值时间后触发滚动创建新文件,单位:秒,设置为1天防止产生小文件 ex_trade_agent.sinks.k1.hdfs.rollInterval = 86400 # 当前文件写入达到该大小后触发滚动创建新文件,单位:字节,设置为128M ex_trade_agent.sinks.k1.hdfs.rollSize = 134217700 # 向 HDFS 写入内容时每次批量操作的 Event 数量 ex_trade_agent.sinks.k1.hdfs.batchSize = 2000 # 不根据 Event 数量来分割文件 ex_trade_agent.sinks.k1.hdfs.rollCount = 0 # 文件写入格式设置为 Text,否则 Impala或 Apache Hive 无法读取这些文件。 ex_trade_agent.sinks.k1.hdfs.writeFormat = Text # 配置Agent ex_trade_agent的channel c1的属性,用来缓冲数据 ex_trade_agent.channels.c1.type = file # 配置通道的检查点目录 ex_trade_agent.channels.c1.checkpointDir = /data/deploy/flume/checkpoint # 配置通道的数据目录 ex_trade_agent.channels.c1.dataDirs = /data/deploy/flume/data # 把source和sink绑定到channel上 ex_trade_agent.sources.r1.channels = c1 ex_trade_agent.sinks.k1.channel = c1 启动脚本: ...
Flume NG支持运行时动态修改配置的配置模块 细说一下PollingPropertiesFileConfigurationProvider提供的运行时动态修改配置并生效的能力。 要实现动态修改配置文件并生效,主要有两个待实现的功能 观察配置文件是否修改 如果修改,将修改的内容通知给观察者 对于第一点,监控配置文件是否修改,Flume NG定义了一个FileWatcherRunnable对象来监控配置文件,启动了一个单独的线程采用定时轮询的方式来监控,轮询频率是30毫秒一次,比较file.lastModified属性与lastChange时间戳,当file.lastModified > lastChange时表示文件被修改 public class FileWatcherRunnable implements Runnable { private final File file; private final CounterGroup counterGroup; private long lastChange; public FileWatcherRunnable(File file, CounterGroup counterGroup) { super(); this.file = file; this.counterGroup = counterGroup; this.lastChange = 0L; } @Override public void run() { LOGGER.debug("Checking file:{} for changes", file); counterGroup.incrementAndGet("file.checks"); long lastModified = file.lastModified(); if (lastModified > lastChange) { LOGGER.info("Reloading configuration file:{}", file); counterGroup.incrementAndGet("file.loads"); lastChange = lastModified; try { eventBus.post(getConfiguration()); } catch (Exception e) { LOGGER.error("Failed to load configuration data. Exception follows.", e); } catch (NoClassDefFoundError e) { LOGGER.error("Failed to start agent because dependencies were not " + "found in classpath. Error follows.", e); } catch (Throwable t) { // caught because the caller does not handle or log Throwables LOGGER.error("Unhandled error", t); } } } } // PollingPropertiesFileConfigurationProvider.start()启动一个单独的线程来监控properties配置文件 public void start() { LOGGER.info("Configuration provider starting"); Preconditions.checkState(file != null, "The parameter file must not be null"); executorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d") .build()); FileWatcherRunnable fileWatcherRunnable = new FileWatcherRunnable(file, counterGroup); executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval, TimeUnit.SECONDS); lifecycleState = LifecycleState.START; LOGGER.debug("Configuration provider started"); } 对于第二点,利用Guava EventBus提供的发布订阅模式机制,将配置修改封装成事件传递给Application,来重新加载配置 ...