Flume NG支持运行时动态修改配置的配置模块 细说一下PollingPropertiesFileConfigurationProvider提供的运行时动态修改配置并生效的能力。
要实现动态修改配置文件并生效,主要有两个待实现的功能
观察配置文件是否修改
如果修改,将修改的内容通知给观察者
对于第一点,监控配置文件是否修改,Flume NG定义了一个FileWatcherRunnable对象来监控配置文件,启动了一个单独的线程采用定时轮询的方式来监控,轮询频率是30毫秒一次,比较file.lastModified属性与lastChange时间戳,当file.lastModified > lastChange时表示文件被修改
public class FileWatcherRunnable implements Runnable { private final File file; private final CounterGroup counterGroup; private long lastChange; public FileWatcherRunnable(File file, CounterGroup counterGroup) { super(); this.file = file; this.counterGroup = counterGroup; this.lastChange = 0L; } @Override public void run() { LOGGER.debug("Checking file:{} for changes", file); counterGroup.incrementAndGet("file.checks"); long lastModified = file.lastModified(); if (lastModified > lastChange) { LOGGER.info("Reloading configuration file:{}", file); counterGroup.incrementAndGet("file.loads"); lastChange = lastModified; try { eventBus.post(getConfiguration()); } catch (Exception e) { LOGGER.error("Failed to load configuration data. Exception follows.", e); } catch (NoClassDefFoundError e) { LOGGER.error("Failed to start agent because dependencies were not " + "found in classpath. Error follows.", e); } catch (Throwable t) { // caught because the caller does not handle or log Throwables LOGGER.error("Unhandled error", t); } } } } // PollingPropertiesFileConfigurationProvider.start()启动一个单独的线程来监控properties配置文件 public void start() { LOGGER.info("Configuration provider starting"); Preconditions.checkState(file != null, "The parameter file must not be null"); executorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d") .build()); FileWatcherRunnable fileWatcherRunnable = new FileWatcherRunnable(file, counterGroup); executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval, TimeUnit.SECONDS); lifecycleState = LifecycleState.START; LOGGER.debug("Configuration provider started"); } 对于第二点,利用Guava EventBus提供的发布订阅模式机制,将配置修改封装成事件传递给Application,来重新加载配置
...