Kafka数据导入ClickHouse

1.Kafka中数据导入ClickHouse的标准流程 在ClickHouse中建立Kafka Engine 外表,作为Kafka数据源的一个接口 在ClickHouse中创建普通表(通常是MergeTree系列)存储Kafka中的数据 在ClickHouse中创建Materialized View, 监听Kafka中的数据,并将数据写入ClickHouse存储表中 上述三个步骤,就可以将Kafka中的数据导入到ClickHouse集群中。 2. Kafka数据导入ClickHouse详细步骤 ClickHouse 提供了Kafka Engine 作为访问Kafka集群的一个接口(数据流)。有了这个接口后,导入数据就很方便了,具体步骤如下: 步骤1:创建Kafka Engine CREATE TABLE source ( `ts` DateTime, `tag` String, `message` String ) ENGINE = Kafka() SETTINGS kafka_broker_list = '172.19.0.47:9092', kafka_topic_list = 'tag', kafka_group_name = 'clickhouse', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 1, kafka_num_consumers = 2 必选参数: kafkabrokerlist: 这里填写Kafka服务的broker列表,用逗号分隔 kafkatopiclist: 这里填写Kafka topic,多个topic用逗号分隔 kafkagroupname:这里填写消费者group名称 kafkaformat__:Kafka数据格式, ClickHouse支持的Format, 详见这里 可选参数: kafkaskipbrokenmessages:填写大于等于0的整数,表示忽略解析异常的Kafka数据的条数。如果出现了N条异常后,后台线程结束,Materialized View会被重新安排后台线程去监听数据 kafkanumconsumers_: 单个Kafka Engine 的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应topic的partitions总数 kafkarowdelimiter: 消息分隔符 kafkaschema__:对于kafkaformat需要schema定义的时候,其schema由该参数确定 kafkamaxblocksize: 该参数控制Kafka数据写入目标表的Block大小,超过该数值后,就将数据刷盘。 步骤2:创建存储Kafka数据的目标表,该表就是最终存储Kafka数据 本文中,采用MergeTree来存储Kafka数据: CREATE TABLE target ( `ts` DateTime, `tag` String ) ENGINE = MergeTree() PARTITION BY toYYYYMM(ts) ORDER BY tag 步骤3:创建Metrialized View 抓取数据 本文中,采用如下语句创建MV: ...

2023年1月8日 · 1 分钟