kafka集群运行节点运行不成功

1.现象 由于zookeeper挂掉,造成kafka出现:There are 60 offline partitions。 2.造成的原因 经过排查发现,由于kafka之前Topic在zookeeper中的数据还在,再重新建立会产生冲突导致失败。 3.解决方案 进入Zookeeper中将之前的脏数据删掉再重启kafka。 #1.进入zookeeper sh /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/zookeeper/bin/zkCli.sh #2.删除掉脏数据 deleteall /brokers/topics

2024年10月26日 · 1 分钟

kafka在zk中的目录结构

2023年1月22日 · 0 分钟

Kafka数据导入ClickHouse

1.Kafka中数据导入ClickHouse的标准流程 在ClickHouse中建立Kafka Engine 外表,作为Kafka数据源的一个接口 在ClickHouse中创建普通表(通常是MergeTree系列)存储Kafka中的数据 在ClickHouse中创建Materialized View, 监听Kafka中的数据,并将数据写入ClickHouse存储表中 上述三个步骤,就可以将Kafka中的数据导入到ClickHouse集群中。 2. Kafka数据导入ClickHouse详细步骤 ClickHouse 提供了Kafka Engine 作为访问Kafka集群的一个接口(数据流)。有了这个接口后,导入数据就很方便了,具体步骤如下: 步骤1:创建Kafka Engine CREATE TABLE source ( `ts` DateTime, `tag` String, `message` String ) ENGINE = Kafka() SETTINGS kafka_broker_list = '172.19.0.47:9092', kafka_topic_list = 'tag', kafka_group_name = 'clickhouse', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 1, kafka_num_consumers = 2 必选参数: kafkabrokerlist: 这里填写Kafka服务的broker列表,用逗号分隔 kafkatopiclist: 这里填写Kafka topic,多个topic用逗号分隔 kafkagroupname:这里填写消费者group名称 kafkaformat__:Kafka数据格式, ClickHouse支持的Format, 详见这里 可选参数: kafkaskipbrokenmessages:填写大于等于0的整数,表示忽略解析异常的Kafka数据的条数。如果出现了N条异常后,后台线程结束,Materialized View会被重新安排后台线程去监听数据 kafkanumconsumers_: 单个Kafka Engine 的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应topic的partitions总数 kafkarowdelimiter: 消息分隔符 kafkaschema__:对于kafkaformat需要schema定义的时候,其schema由该参数确定 kafkamaxblocksize: 该参数控制Kafka数据写入目标表的Block大小,超过该数值后,就将数据刷盘。 步骤2:创建存储Kafka数据的目标表,该表就是最终存储Kafka数据 本文中,采用MergeTree来存储Kafka数据: CREATE TABLE target ( `ts` DateTime, `tag` String ) ENGINE = MergeTree() PARTITION BY toYYYYMM(ts) ORDER BY tag 步骤3:创建Metrialized View 抓取数据 本文中,采用如下语句创建MV: ...

2023年1月8日 · 1 分钟

Kafka 中的控制器组件

控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群。集群中任意一台 Broker 都能充当控制器的角色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。换句话说,每个正常运转的 Kafka 集群,在任意时刻都有且只有一个控制器。官网上有个名为 activeController 的 JMX 指标,可以帮助我们实时监控控制器的存活状态。这个 JMX 指标非常关键,你在实际运维操作过程中,一定要实时查看这个指标的值。下面,我们就来详细说说控制器的原理和内部运行机制。 在开始之前,我先简单介绍一下 Apache ZooKeeper 框架。要知道,控制器是重度依赖 ZooKeeper 的,因此,我们有必要花一些时间学习下 ZooKeeper 是做什么的。 Apache ZooKeeper 是一个提供高可靠性的分布式协调服务框架。它使用的数据模型类似于文件系统的树形结构,根目录也是以 “/” 开始。该结构上的每个节点被称为 znode,用来保存一些元数据协调信息。 如果以 znode 持久性来划分,znode 可分为持久性 znode 和临时 znode。持久性 znode 不会因为 ZooKeeper 集群重启而消失,==而临时 znode 则与创建该 znode 的 ZooKeeper 会话绑定,一旦会话结束,该节点会被自动删除==。 ==ZooKeeper 赋予客户端监控 znode 变更的能力,即所谓的 Watch 通知功能。一旦 znode 节点被创建、删除,子节点数量发生变化,抑或是 znode 所存的数据本身变更,ZooKeeper 会通过节点变更监听器 (ChangeHandler) 的方式显式通知客户端。== 依托于这些功能,ZooKeeper 常被用来实现集群成员管理、分布式锁、领导者选举等功能。==Kafka 控制器大量使用 Watch 功能实现对集群的协调管理==。我们一起来看一张图片,它展示的是 Kafka 在 ZooKeeper 中创建的 znode 分布。你不用了解每个 znode 的作用,但你可以大致体会下 Kafka 对 ZooKeeper 的依赖。 ...

2022年12月25日 · 2 分钟

Kafka 常见的脚本汇总

命令行脚本概览 Kafka 默认提供了很多个命令行脚本,用于实现各种各样的功能和运维管理。今天我以 2.2 版本为例,详细地盘点下这些命令行工具。下图展示了 2.2 版本提供的所有命令行脚本。 从图中我们可以知道,2.2 版本总共提供了 30 个 SHELL 脚本。图中的 windows 实际上是个子目录,里面保存了 Windows 平台下的 BAT 批处理文件。其他的. sh 文件则是 Linux 平台下的标准 SHELL 脚本。 默认情况下,不加任何参数或携带 –help 运行 SHELL 文件,会得到该脚本的使用方法说明。下面这张图片展示了 kafka-log-dirs 脚本的调用方法。 有了这些基础的了解,我来逐一地说明这些脚本的用途,然后再给你详细地介绍一些常见的脚本。 我们先来说说 connect-standalone 和 connect-distributed 两个脚本。这两个脚本是 Kafka Connect 组件的启动脚本。在专栏第 4 讲谈到 Kafka 生态时,我曾说过社区提供了 Kafka Connect 组件,用于实现 Kafka 与外部世界系统之间的数据传输。Kafka Connect 支持单节点的 Standalone 模式,也支持多节点的 Distributed 模式。这两个脚本分别是这两种模式下的启动脚本。鉴于 Kafka Connect 不在我们的讨论范围之内,我就不展开讲了。 接下来是 kafka-acls 脚本。它是用于设置 Kafka 权限的,比如设置哪些用户可以访问 Kafka 的哪些主题之类的权限。在专栏后面,我会专门来讲 Kafka 安全设置的内容,到时候我们再细聊这个脚本。 ...

2022年12月11日 · 5 分钟

如何重设消费者组位移

为什么要重设消费者组位移? 我们知道,Kafka 和传统的消息引擎在设计上是有很大区别的,其中一个比较显著的区别就是,Kafka 的消费者读取消息是可以重演的(replayable)。 像 RabbitMQ 或 ActiveMQ 这样的传统消息中间件,它们处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功处理,就会被从 Broker 上删除。 反观 Kafka,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,是只读的操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能。 对了,之前有很多同学在专栏的留言区提问:在实际使用场景中,我该如何确定是使用传统的消息中间件,还是使用 Kafka 呢?我在这里统一回答一下。如果在你的场景中,消息处理逻辑非常复杂,处理代价很高,同时你又不关心消息之间的顺序,那么传统的消息中间件是比较合适的;反之,如果你的场景需要较高的吞吐量,但每条消息的处理时间很短,同时你又很在意消息的顺序,此时,Kafka 就是你的首选。 重设位移策略 不论是哪种设置方式,重设位移大致可以从两个维度来进行。 位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给定的位移值。 时间维度。我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前的位移值。 下面的这张表格罗列了 7 种重设策略。接下来,我来详细解释下这些策略。 Earliest 策略表示将位移调整到主题当前最早位移处。这个最早位移不一定就是 0,因为在生产环境中,很久远的消息会被 Kafka 自动删除,所以当前最早位移很可能是一个大于 0 的值。如果你想要重新消费主题的所有消息,那么可以使用 Earliest 策略。 Latest 策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了 15 条消息,那么最新末端位移就是 15。如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用 Latest 策略。 Current 策略表示将位移调整成消费者当前提交的最新位移。有时候你可能会碰到这样的场景:你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current 策略就可以帮你实现这个功能。 表中第 4 行的 Specified-Offset 策略则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地 “跳过” 此消息的处理。在实际使用过程中,可能会出现 corrupted 消息无法被消费的情形,此时消费者程序会抛出异常,无法继续工作。一旦碰到这个问题,你就可以尝试使用 Specified-Offset 策略来规避。 如果说 Specified-Offset 策略要求你指定位移的绝对数值的话,那么 Shift-By-N 策略指定的就是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的 “跳” 是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定 N 为 -100。 ...

2022年4月17日 · 3 分钟

如何调优 Kafka

调优目标 在做调优之前,我们必须明确优化 Kafka 的目标是什么。通常来说,调优是为了满足系统常见的非功能性需求。在众多的非功能性需求中,性能绝对是我们最关心的那一个。不同的系统对性能有不同的诉求,比如对于数据库用户而言,性能意味着请求的响应时间,用户总是希望查询或更新请求能够被更快地处理完并返回。 对 Kafka 而言,性能一般是指吞吐量和延时。 吞吐量,也就是 TPS,是指 Broker 端进程或 Client 端应用程序每秒能处理的字节数或消息数,这个值自然是越大越好。 延时和我们刚才说的响应时间类似,它表示从 Producer 端发送消息到 Broker 端持久化完成之间的时间间隔。这个指标也可以代表端到端的延时(End-to-End,E2E),也就是从 Producer 发送消息到 Consumer 成功消费该消息的总时长。和 TPS 相反,我们通常希望延时越短越好。 总之,高吞吐量、低延时是我们调优 Kafka 集群的主要目标,一会儿我们会详细讨论如何达成这些目标。在此之前,我想先谈一谈优化漏斗的问题。 优化漏斗 优化漏斗是一个调优过程中的分层漏斗,我们可以在每一层上执行相应的优化调整。总体来说,层级越靠上,其调优的效果越明显,整体优化效果是自上而下衰减的,如下图所示: 第 1 层:应用程序层。它是指优化 Kafka 客户端应用程序代码。比如,使用合理的数据结构、缓存计算开销大的运算结果,抑或是复用构造成本高的对象实例等。这一层的优化效果最为明显,通常也是比较简单的。 第 2 层:框架层。它指的是合理设置 Kafka 集群的各种参数。毕竟,直接修改 Kafka 源码进行调优并不容易,但根据实际场景恰当地配置关键参数的值,还是很容易实现的。 第 3 层:JVM 层。Kafka Broker 进程是普通的 JVM 进程,各种对 JVM 的优化在这里也是适用的。优化这一层的效果虽然比不上前两层,但有时也能带来巨大的改善效果。 第 4 层:操作系统层。对操作系统层的优化很重要,但效果往往不如想象得那么好。与应用程序层的优化效果相比,它是有很大差距的。 基础性调优 接下来,我就来分别介绍一下优化漏斗的 4 个分层的调优。 操作系统调优 我先来说说操作系统层的调优。在操作系统层面,你最好在挂载(Mount)文件系统时禁掉 atime 更新。atime 的全称是 access time,记录的是文件最后被访问的时间。记录 atime 需要操作系统访问 inode 资源,而禁掉 atime 可以避免 inode 访问时间的写入操作,减少文件系统的写操作数。你可以执行 mount -o noatime 命令进行设置。 ...

2022年4月3日 · 3 分钟