Kafka 中的控制器组件

控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群。集群中任意一台 Broker 都能充当控制器的角色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。换句话说,每个正常运转的 Kafka 集群,在任意时刻都有且只有一个控制器。官网上有个名为 activeController 的 JMX 指标,可以帮助我们实时监控控制器的存活状态。这个 JMX 指标非常关键,你在实际运维操作过程中,一定要实时查看这个指标的值。下面,我们就来详细说说控制器的原理和内部运行机制。 在开始之前,我先简单介绍一下 Apache ZooKeeper 框架。要知道,控制器是重度依赖 ZooKeeper 的,因此,我们有必要花一些时间学习下 ZooKeeper 是做什么的。 Apache ZooKeeper 是一个提供高可靠性的分布式协调服务框架。它使用的数据模型类似于文件系统的树形结构,根目录也是以 “/” 开始。该结构上的每个节点被称为 znode,用来保存一些元数据协调信息。 如果以 znode 持久性来划分,znode 可分为持久性 znode 和临时 znode。持久性 znode 不会因为 ZooKeeper 集群重启而消失,==而临时 znode 则与创建该 znode 的 ZooKeeper 会话绑定,一旦会话结束,该节点会被自动删除==。 ==ZooKeeper 赋予客户端监控 znode 变更的能力,即所谓的 Watch 通知功能。一旦 znode 节点被创建、删除,子节点数量发生变化,抑或是 znode 所存的数据本身变更,ZooKeeper 会通过节点变更监听器 (ChangeHandler) 的方式显式通知客户端。== 依托于这些功能,ZooKeeper 常被用来实现集群成员管理、分布式锁、领导者选举等功能。==Kafka 控制器大量使用 Watch 功能实现对集群的协调管理==。我们一起来看一张图片,它展示的是 Kafka 在 ZooKeeper 中创建的 znode 分布。你不用了解每个 znode 的作用,但你可以大致体会下 Kafka 对 ZooKeeper 的依赖。 ...

2022年12月25日 · 2 分钟

Kafka 常见的脚本汇总

命令行脚本概览 Kafka 默认提供了很多个命令行脚本,用于实现各种各样的功能和运维管理。今天我以 2.2 版本为例,详细地盘点下这些命令行工具。下图展示了 2.2 版本提供的所有命令行脚本。 从图中我们可以知道,2.2 版本总共提供了 30 个 SHELL 脚本。图中的 windows 实际上是个子目录,里面保存了 Windows 平台下的 BAT 批处理文件。其他的. sh 文件则是 Linux 平台下的标准 SHELL 脚本。 默认情况下,不加任何参数或携带 –help 运行 SHELL 文件,会得到该脚本的使用方法说明。下面这张图片展示了 kafka-log-dirs 脚本的调用方法。 有了这些基础的了解,我来逐一地说明这些脚本的用途,然后再给你详细地介绍一些常见的脚本。 我们先来说说 connect-standalone 和 connect-distributed 两个脚本。这两个脚本是 Kafka Connect 组件的启动脚本。在专栏第 4 讲谈到 Kafka 生态时,我曾说过社区提供了 Kafka Connect 组件,用于实现 Kafka 与外部世界系统之间的数据传输。Kafka Connect 支持单节点的 Standalone 模式,也支持多节点的 Distributed 模式。这两个脚本分别是这两种模式下的启动脚本。鉴于 Kafka Connect 不在我们的讨论范围之内,我就不展开讲了。 接下来是 kafka-acls 脚本。它是用于设置 Kafka 权限的,比如设置哪些用户可以访问 Kafka 的哪些主题之类的权限。在专栏后面,我会专门来讲 Kafka 安全设置的内容,到时候我们再细聊这个脚本。 ...

2022年12月11日 · 5 分钟

impala配置调优

Impala Daemon 命令行参数高级配置代码段(安全阀) -use_local_tz_for_unix_timestamp_conversions=true -convert_legacy_hive_parquet_utc_timestamps=true 在hive中,一个中文字符长度为1 在impala中,一个中文字符长度为3

2022年11月27日 · 1 分钟

Hive注册udf

1.创建临时函数 hive> add jar /home/hadoop/bigdata_udf.jar; hive> create temporary function isContains100 as 'com.xx.hive.udf.hm2.IsContains100'; --验证 hive> select isContains100(t.col1) from t limit 10; hive> drop temporary function isContains100; 2.创建永久函数 hadoop fs -put /opt/bigdata_udf.jar /udf hive> create function default.url_decode as 'com.xx.udf.DecodeURL' using jar 'hdfs:///udf/bigdata_udf.jar'; --验证 hive> select default.url_decode(t.col1) from t limit 10; 注意:注册永久函数必须使用hdfs路径,不可使用本地路径

2022年11月13日 · 1 分钟

Hive映射HBase数据源

CREATE EXTERNAL TABLE `mongodb_dingtalk.hbasetohive_patent`( key String, abstracts String, address String) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:abstracts,info:address") TBLPROPERTIES("[hbase.table.name](http://hbase.table.name/)" = "dingtalk:patent"); 仅从HBase拉取数据使用,禁止利用此种方式往HBase写数据 数据拉出时拉出hbase中数据对应当前版本的时间戳 create external table ods.ods_zxk_hbase_wechat_public_account_mapping( id string comment '需要拆分的key', ts timestamp comment '数据标识符,用来跟新数据做去重') comment '高级搜索有无微信公众号' STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,:timestamp') TBLPROPERTIES('[hbase.table.name](http://hbase.table.name/)' = 'dingtalk:wechat_public_account')

2022年10月30日 · 1 分钟

hive映射es

一 hive导入es 1 创建hive-es映射表 CREATE EXTERNAL TABLE hive_es.re_run_test2( id STRING ,test STRING) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 're_run_test2/test', 'es.nodes'='172.16.98.113,172.16.98.149,172.16.98.150,172.16.98.151,172.16.98.152', 'es.port'='9200', 'es.mapping.id' ='id') 注: 1. es.resource对应es中的index/type 2. 1.es.mapping.names为hive和es字段名映射关系。 2.如果hive表和es表字段名完全一致,可以省略此参数。 3.hive中字段名不区分大小写,元数据寸的全是小写;es中字段大小写敏感,如果es中字段名出现大写,需认真填写。 4.es中_id为自动生成,如若需要覆盖,需加参数'es.mapping.id'='id' 2 先导入es映射表相关jar包 add jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/auxlib/elasticsearch-hadoop-6.3.0.jar; add jar /data/jar/httpclient-4.5.5.jar; add jar /data/jar/org.apache.commons.httpclient.jar; 3 向映射表insert数据 二 es导入hive 1 建hive映射表 CREATE EXTERNAL TABLE hive_es.cty_test5( addTime string ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'cty_test/cty_test', 'es.nodes'='172.16.98.113,172.16.98.149,172.16.98.150,172.16.98.151,172.16.98.152', 'es.port'='9200', 'es.mapping.names'= 'addTime:addTime', 'es.mapping.date.rich'='false', 'es.index.auto.create'='false', ) 注意,hive表数据类型要和es一致,除了es的date要转成hive的string,同时要加参数’es.mapping.date.rich’=‘false’,否则查询会报错. 2 通过映射表向其他表insert 参考: https://www.cnblogs.com/koushr/p/9505435.html

2022年10月16日 · 1 分钟

hive设置spark参数

set spark.master=yarn-cluster; #设置spark提交模式 set hive.execution.engine=spark; #设置计算引擎 set spark.yarn.queue=queue_name; #设置作业提交队列 set spark.app.name=job_name; #设置作业名称 set spark.executor.instances=20; #设置执行器个数 set spark.executor.cores=4; #设置执行器计算核个数 set spark.executor.memory=8g; #设置执行器内存 set mapred.reduce.tasks=600; #设置任务并行度 set spark.yarn.executor.memoryOverhead=2048; #设置每个executor的jvm堆外内存 set spark.memory.fraction=0.8; #设置内存比例(spark2.0+) set spark.serializer=org.apache.serializer.KyroSerializer; #设置对象序列化方式

2022年10月2日 · 1 分钟

Hive调优大全

调优具体细节 Hive建表设计层面 Hive 的建表设计层面调优,主要讲的怎么样合理的组织数据,方便后续的高效计算。比如建表的类型,文件存储格式,是否压缩等等。 利用分区表优化 关于 Hive 的表的类型有哪些? 1、分区表 2、分桶表 分区表 是在某一个或者几个维度上对数据进行分类存储,一个分区对应一个目录。如果筛选条件里有分区字段,那么 Hive 只需要遍历对应分区目录下的文件即可,不需要遍历全局数据,使得处理的数据量大大减少,从而提高查询效率。 也就是说:当一个 Hive 表的查询大多数情况下,会根据某一个字段进行筛选时,那么非常适合创建为分区表,该字段即为分区字段。 select1: select .... where country = "china" select2: select .... where country = "china" select3: select .... where country = "china" select4: select .... where country = "china" ..... 分门别类:这个city字段的每个值,就单独形成为一个分区。其实每个分区就对应带HDFS的一个目录 在创建表时通过启用 partitioned by 实现,用来 partition 的维度并不是实际数据的某一列,具体分区的标志是由插入内容时给定的。当要查询某一分区的内容时可以采用 where 语句,形似 where tablename.partition_column = a 来实现。 1、创建含分区的表: CREATE TABLE page_view(viewTime INT, userid BIGINT, page_url STRING, referrer_url STRING, ip STRING COMMENT 'IP Address of the User') PARTITIONED BY(date STRING, country STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' STORED AS TEXTFILE; 2、载入内容,并指定分区标志: ...

2022年9月18日 · 14 分钟

Hive常用参数语句

1.动态分区 SET hive.exec.dynamic.partition=true; SET hive.exec.dynamic.partition.mode=nonstrict; SET hive.exec.max.dynamic.partitions=100000; SET hive.exec.max.dynamic.partitions.pernode=100000; SET hive.exec.max.created.files=100000; 2.union all并发执行 --在使用union all的时候,系统资源足够的情况下,为了加快hive处理速度,可以设置如下参数实现并发执行 set mapred.job.priority=VERY_HIGH; set hive.exec.parallel=true; 3.设置map reduce个数 -- 设置map capacity set mapred.job.map.capacity=2000; set mapred.job.reduce.capacity=2000; -- 设置每个reduce的大小 set hive.exec.reducers.bytes.per.reducer=500000000; -- 直接设置个数 set mapred.reduce.tasks = 15; 4.文件合并 -- 设置文件合并 set abaci.is.dag.job=false; set hive.merge.mapredfiles=true; set mapred.combine.input.format.local.only=false; set hive.merge.smallfiles.avgsize=100000000; -- 在map only的情况下,如上的参数如果没有生效,可以设置如下 -- 在HQL的最外层增加distribute by rand() select * from XXX distribute by rand() 5.设置任务名称 -- 设置名称 set mapred.job.name=${my_job}; 6.设置引擎和指定队列 set hive.execution.engine=mr; set mapreduce.job.queuename=bigdata;

2022年9月4日 · 1 分钟

Hive-mongo导入导出

1.MongoDB拉出到hive # 导入mongodb的包到hadoop add jar /var/lib/hadoop-hdfs/bin/hive_mongoDB/mongo-hadoop-core-2.0.2.jar; add jar /var/lib/hadoop-hdfs/bin/hive_mongoDB/mongo-hadoop-hive-2.0.2.jar; add jar /var/lib/hadoop-hdfs/bin/hive_mongoDB/mongo-java-driver-3.12.8.jar; set mongo.input.split.create_input_splits=false; DROP TABLE IF EXISTS ods.ods_ex_trade_mdb_wmb_rocket_waimao_company_info_mapping; drop table ods.ods_zxk_annualBusiness_mapping; create external table ods.ods_zxk_annualBusiness_mapping( `_id` string, companyName string, Tel string )STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler' WITH SERDEPROPERTIES('mongo.columns.mapping'='{"_id":"_id","companyName":"companyName","Tel":"Tel"}') TBLPROPERTIES('mongo.uri'='mongodb://username:password@172.16.98.159:21000/annualReport.annualBusiness?authSource=admin');

2022年8月21日 · 1 分钟