Hive 字段中文注释乱码

Hive 字段中文乱码,如执行 show create table xxx 时,表级别注释、字段级别注释发现有乱码现象(都是????), 一般都是由 hive 元数据库的配置不当造成的。 此时可按如下步骤进行配置调整: 登录 hive 的元数据库 mysql 中: 1、设置 hive 元数据库字符集 show create database hive; 查看为 utf8,需变更为 latin1 _alter database hive character set latin1; 2、更改如下表字段为字符集编码为 utf8 ①修改表字段注解和表注解 alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8 alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8 ② 修改分区字段注解: alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8 ; alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8; ③修改索引注解: alter table INDEX_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8; ...

2022年8月7日 · 1 分钟

Hive 数仓建表该选用 ORC 还是 Parquet,压缩选 LZO 还是 Snappy

在数仓中,建议大家除了接口表(从其他数据库导入或者是最后要导出到其他数据库的表),其余表的存储格式与压缩格式保持一致。 在数仓中,建议大家除了接口表(从其他数据库导入或者是最后要导出到其他数据库的表),其余表的存储格式与压缩格式保持一致。 我们先来说一下目前 Hive 表主流的存储格式与压缩方式 从 Hive 官网得知,Apache Hive 支持 Apache Hadoop 中使用的几种熟悉的文件格式,如 TextFile(文本格式),RCFile(行列式文件),SequenceFile(二进制序列化文件),AVRO,ORC(优化的行列式文件)和Parquet 格式,而这其中我们目前使用最多的是TextFile,SequenceFile,ORC和Parquet。 下面来详细了解下这 2 种行列式存储。 1、ORC 1.1 ORC 的存储结构 我们先从官网上拿到 ORC 的存储模型图 看起来略微有点复杂,那我们稍微简化一下,我画了一个简单的图来说明一下 但是由于索引的高成本,在**「目前的 Hive3.X 中,已经废除了索引」**,当然也早就引入了列式存储。 列式存储的存储方式,是按照一列一列存储的,如上图中的右图,这样的话如果查询一个字段的数据,就等于是索引查询,效率高。但是如果需要查全表,它因为需要分别取所有的列最后汇总,反而更占用资源。于是 ORC 行列式存储出现了。 在需要全表扫描时,可以按照行组读取 如果需要取列数据,在行组的基础上,读取指定的列,而不需要所有行组内所有行的数据和一行内所有字段的数据。 了解了 ORC 存储的基本逻辑后,我们再来看看它的存储模型图。 同时我也把详细的文字也附在下面,大家可以对照着看看: 条带 (stripe):ORC 文件存储数据的地方,每个 stripe 一般为 HDFS 的块大小。(包含以下 3 部分) index data:保存了所在条带的一些统计信息,以及数据在 stripe中的位置索引信息。 rows data:数据存储的地方,由多个行组构成,每10000行构成一个行组,数据以流( stream)的形式进行存储。 stripe footer:保存数据所在的文件目录 文件脚注 (file footer):包含了文件中 sipe 的列表, 每个 stripe 的行数, 以及每个列的数据类型。它还包含每个列的最小值、最大值、行计数、求和等聚合信息。 postscript:含有压缩参数和压缩大小相关的信息 所以其实发现,ORC 提供了 3 级索引,文件级、条带级、行组级,所以在查询的时候,利用这些索引可以规避大部分不满足查询条件的文件和数据块。 ...

2022年7月24日 · 3 分钟

hadoop默认端口

端口 用途 9000 fs.defaultFS,如:hdfs://172.25.40.171:9000 9001 dfs.namenode.rpc-address,DataNode会连接这个端口 50070 dfs.namenode.http-address 50470 dfs.namenode.https-address 50100 dfs.namenode.backup.address 50105 dfs.namenode.backup.http-address 50090 dfs.namenode.secondary.http-address,如:172.25.39.166:50090 50091 dfs.namenode.secondary.https-address,如:172.25.39.166:50091 50020 dfs.datanode.ipc.address 50075 dfs.datanode.http.address 50475 dfs.datanode.https.address 50010 dfs.datanode.address,DataNode的数据传输端口 8480 dfs.journalnode.rpc-address 8481 dfs.journalnode.https-address 8032 yarn.resourcemanager.address 8088 yarn.resourcemanager.webapp.address,YARN的http端口 8090 yarn.resourcemanager.webapp.https.address 8030 yarn.resourcemanager.scheduler.address 8031 yarn.resourcemanager.resource-tracker.address 8033 yarn.resourcemanager.admin.address 8042 yarn.nodemanager.webapp.address 8040 yarn.nodemanager.localizer.address 8188 yarn.timeline-service.webapp.address 10020 mapreduce.jobhistory.address 19888 mapreduce.jobhistory.webapp.address 2888 ZooKeeper,如果是Leader,用来监听Follower的连接 3888 ZooKeeper,用于Leader选举 2181 ZooKeeper,用来监听客户端的连接 60010 hbase.master.info.port,HMaster的http端口 60000 hbase.master.port,HMaster的RPC端口 60030 hbase.regionserver.info.port,HRegionServer的http端口 60020 hbase.regionserver.port,HRegionServer的RPC端口 8080 hbase.rest.port,HBase REST server的端口 10000 hive.server2.thrift.port 9083 hive.metastore.uris

2022年7月10日 · 1 分钟

flume示例

conf: # ex_trade.conf:外贸数据自kafka接入hdfs # 配置负责人:褚天宇 # 配置时间: 2022-01-17 # 配置Agent ex_trade_agent各个组件的名称 ex_trade_agent.sources = r1 ex_trade_agent.sinks = k1 ex_trade_agent.channels = c1 # 配置Agent ex_trade_agent的source r1的属性 # 设置kafka源 ex_trade_agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource # 设置kafka消费者组id (不能改动!改动会丢失已记录的offset) ex_trade_agent.sources.r1.kafka.consumer.group.id = ex_trade_flume ex_trade_agent.sources.r1.kafka.bootstrap.servers = 192.168.102.2:9092,192.168.102.3:9092,192.168.102.7:9092 ex_trade_agent.sources.r1.kafka.topics = ex_trade # 一批写入 channel 的最大消息数 ex_trade_agent.sources.r1.batchSize = 2000 # 一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作 ex_trade_agent.sources.r1.batchDurationMillis = 2000 # 设置kafka-offset策略为从头消费 ex_trade_agent.sources.r1.kafka.consumer.auto.offset.reset = earliest # 配置Agent ex_trade_agent的sink k1的属性 # 配置hdfs目标 ex_trade_agent.sinks.k1.type = hdfs # 配置hdfs写入目录,每天一个文件夹,为之后hive表按日分区做准备 ex_trade_agent.sinks.k1.hdfs.path = /flume/ex_trade/date=%Y%m%d/ # 配置文件前缀 ex_trade_agent.sinks.k1.hdfs.filePrefix = ex_trade_ # 配置数据类型为原样输出 ex_trade_agent.sinks.k1.hdfs.fileType = DataStream # 当前文件写入达到该值时间后触发滚动创建新文件,单位:秒,设置为1天防止产生小文件 ex_trade_agent.sinks.k1.hdfs.rollInterval = 86400 # 当前文件写入达到该大小后触发滚动创建新文件,单位:字节,设置为128M ex_trade_agent.sinks.k1.hdfs.rollSize = 134217700 # 向 HDFS 写入内容时每次批量操作的 Event 数量 ex_trade_agent.sinks.k1.hdfs.batchSize = 2000 # 不根据 Event 数量来分割文件 ex_trade_agent.sinks.k1.hdfs.rollCount = 0 # 文件写入格式设置为 Text,否则 Impala或 Apache Hive 无法读取这些文件。 ex_trade_agent.sinks.k1.hdfs.writeFormat = Text # 配置Agent ex_trade_agent的channel c1的属性,用来缓冲数据 ex_trade_agent.channels.c1.type = file # 配置通道的检查点目录 ex_trade_agent.channels.c1.checkpointDir = /data/deploy/flume/checkpoint # 配置通道的数据目录 ex_trade_agent.channels.c1.dataDirs = /data/deploy/flume/data # 把source和sink绑定到channel上 ex_trade_agent.sources.r1.channels = c1 ex_trade_agent.sinks.k1.channel = c1 启动脚本: ...

2022年6月26日 · 2 分钟

flume动态加载配置原理

Flume NG支持运行时动态修改配置的配置模块 细说一下PollingPropertiesFileConfigurationProvider提供的运行时动态修改配置并生效的能力。 要实现动态修改配置文件并生效,主要有两个待实现的功能 观察配置文件是否修改 如果修改,将修改的内容通知给观察者 对于第一点,监控配置文件是否修改,Flume NG定义了一个FileWatcherRunnable对象来监控配置文件,启动了一个单独的线程采用定时轮询的方式来监控,轮询频率是30毫秒一次,比较file.lastModified属性与lastChange时间戳,当file.lastModified > lastChange时表示文件被修改 public class FileWatcherRunnable implements Runnable { private final File file; private final CounterGroup counterGroup; private long lastChange; public FileWatcherRunnable(File file, CounterGroup counterGroup) { super(); this.file = file; this.counterGroup = counterGroup; this.lastChange = 0L; } @Override public void run() { LOGGER.debug("Checking file:{} for changes", file); counterGroup.incrementAndGet("file.checks"); long lastModified = file.lastModified(); if (lastModified > lastChange) { LOGGER.info("Reloading configuration file:{}", file); counterGroup.incrementAndGet("file.loads"); lastChange = lastModified; try { eventBus.post(getConfiguration()); } catch (Exception e) { LOGGER.error("Failed to load configuration data. Exception follows.", e); } catch (NoClassDefFoundError e) { LOGGER.error("Failed to start agent because dependencies were not " + "found in classpath. Error follows.", e); } catch (Throwable t) { // caught because the caller does not handle or log Throwables LOGGER.error("Unhandled error", t); } } } } // PollingPropertiesFileConfigurationProvider.start()启动一个单独的线程来监控properties配置文件 public void start() { LOGGER.info("Configuration provider starting"); Preconditions.checkState(file != null, "The parameter file must not be null"); executorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d") .build()); FileWatcherRunnable fileWatcherRunnable = new FileWatcherRunnable(file, counterGroup); executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval, TimeUnit.SECONDS); lifecycleState = LifecycleState.START; LOGGER.debug("Configuration provider started"); } 对于第二点,利用Guava EventBus提供的发布订阅模式机制,将配置修改封装成事件传递给Application,来重新加载配置 ...

2022年6月12日 · 2 分钟

CDH组件参数调优

1.YARN参数调优 检查项 当前值 修改值 JobHistory Server 的 Java 堆栈大小 1GB 2GB NodeManager 的 Java 堆栈大小 1GB 2GB ResourceManager 的 Java 堆栈大小 1GB 2GB 容器内存 yarn.nodemanager.resource.memory-mb 24GB 32GB 最小容器内存 yarn.scheduler.minimum-allocation-mb 10GB 8GB 最大容器内存 yarn.scheduler.maximum-allocation-mb 40GB 56GB Map 任务内存 mapreduce.map.memory.mb 0M 12GB Reduce 任务内存 mapreduce.reduce.memory.mb 0M 24GB Application Master容器内存 yarn.app.mapreduce.am.resource.mb 24GB 32GB Map 任务 Java 选项库 mapreduce.map.java.opts -Djava.net.preferIPv4Stack=true -Dmapreduce.map.java.opts=-Xmx2048m Reduce 任务 Java 选项库 mapreduce.reduce.java.opts -Djava.net.preferIPv4Stack=true -Dmapreduce.reduce.java.opts=-Xmx2048m yarn.resourcemanager.scheduler.class org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler yarn.scheduler.capacity.root.queues: 当前值: <configuration> <property> <name>yarn.scheduler.capacity.root.queues</name> <value>default</value> </property> <property> <name>yarn.scheduler.capacity.root.capacity</name> <value>100</value> </property> <property> <name>yarn.scheduler.capacity.root.default.capacity</name> <value>100</value> </property> </configuration> 修改值: ...

2022年5月29日 · 2 分钟

CDH角色划分

1. 服务器配置 主节点: hostname: m1,m2,m3 vcore:48 内存:128G SSD:1T(不算系统盘) 工作节点: hostname: n1,n2,n3,n4 vcore : 48 内存:256G SSD:1T(不算系统盘) 2. 节点职责描述 m1: 控制核心;cdh核心,hadoop主节点 m2: 网关入口;主节点高可用,一些组件的web ui,用户入口 ,(前期做计算任务的driver端,后期优化driver打散到各节点) m3: 后台服务;组件元数据库,任务的history服务 ,(后期做元数据HA) n1~n4: 算存一体;提供存储,计算等服务 3.角色分配策略 一、hdfs NameNode一般在主节点上,初始化安装的时候没有高可用,所以有SecondaryNameNode的作为一个备份,NameNode它会将它拆分后进行分布式存储,其中的数据是分散在各个DataNode节点,且默认都会有3个副本,防止其中一台机器宕机使得数据缺失。balancer一般与namenode搭建在一起。 二、hive hive metastore server与hiveServer2一般搭载一起上,但也可以分开,因为hive服务需要启动hiveServer2,访问sparksql需要启动metastore而hive gateway,事实上并不是真正的角色,也没有状态,但它们充当了告诉客户端配置应该放置在哪里。 添加Hive服务时,默认情况下会创建Hive网关。 三、cloudera manager server 这个可以根据实际搭建,这个相当于是集群的监听器,在网页上出现的的图表也就是这个监听器类似的,这个可以搭建在主节点上,但若是主节点上分配的角色过多会影响其服务器的性能。 四、spark 这个角色可以分配这任意的机器上,按实际情况调整。spark-gateway全部部署在各个机器上,这个对于个人理解来说相当于spark、spark2机器之间的通信功能。 五、yarn jobhistory与resourcemanager进行通信,所以部署上一般在同一台机器上放在主节点上,而nodemanager分配在各个节点上 六、zookeeper 这个若是机器足够一般是奇数的,所以部署在m节点上比较合适。奇数台、高可用、与管理角色共置 七、hue 会对外提供一个web ui,以便于数据分析和数据开发做即席查询。这个服务随意部署,根据自己的机器部署情况来看。 4.角色划分详情表 控制核心 网关+入口 元数据+历史服务 存算一体 m1 m2 m3 n1 n2 n3 n4 cloudera management Alert Publisher ✅ Event Server ✅ Host Monitor ✅ Service Monitor ✅ hdfs NameNode ✅ ✅ JournalNode ✅ ✅ ✅ Failover Controller ✅ ✅ HttpFs ✅ DataNode ✅ ✅ ✅ ✅ yarn ResourceManager ✅ ✅ NodeManager ✅ ✅ ✅ ✅ JobHistory Server ✅ hive Hive MetaStore Server ✅ HiveServer2 ✅ HiveGateway ✅ ✅ ✅ ✅ ✅ ✅ ✅ spark Spark History Server ✅ Spark Gateway ✅ ✅ ✅ ✅ ✅ ✅ ✅ impala Impala StateStore ✅ Impala catalog Server ✅ Impala Daemon ✅ ✅ ✅ ✅ zookeeper zk-node ✅ ✅ ✅ hue Hue Server ✅ Hue Load Balancer ✅ 5. 未来升级项 work节点扩展硬盘,只需将新盘挂载到新目录/hadoop/data2或/hadoop/data3,更新hdfs配置就能完成存储扩展(支持热加入) ...

2022年5月15日 · 1 分钟

beeline导出数据

beeline -n chutianyu -p chutianyu –showHeader=false –outputformat=csv2 -e “select * from smp.india_imp_json_test2mongo ;” > india_imp_test1.json

2022年5月1日 · 1 分钟

如何重设消费者组位移

为什么要重设消费者组位移? 我们知道,Kafka 和传统的消息引擎在设计上是有很大区别的,其中一个比较显著的区别就是,Kafka 的消费者读取消息是可以重演的(replayable)。 像 RabbitMQ 或 ActiveMQ 这样的传统消息中间件,它们处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功处理,就会被从 Broker 上删除。 反观 Kafka,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,是只读的操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能。 对了,之前有很多同学在专栏的留言区提问:在实际使用场景中,我该如何确定是使用传统的消息中间件,还是使用 Kafka 呢?我在这里统一回答一下。如果在你的场景中,消息处理逻辑非常复杂,处理代价很高,同时你又不关心消息之间的顺序,那么传统的消息中间件是比较合适的;反之,如果你的场景需要较高的吞吐量,但每条消息的处理时间很短,同时你又很在意消息的顺序,此时,Kafka 就是你的首选。 重设位移策略 不论是哪种设置方式,重设位移大致可以从两个维度来进行。 位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给定的位移值。 时间维度。我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前的位移值。 下面的这张表格罗列了 7 种重设策略。接下来,我来详细解释下这些策略。 Earliest 策略表示将位移调整到主题当前最早位移处。这个最早位移不一定就是 0,因为在生产环境中,很久远的消息会被 Kafka 自动删除,所以当前最早位移很可能是一个大于 0 的值。如果你想要重新消费主题的所有消息,那么可以使用 Earliest 策略。 Latest 策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了 15 条消息,那么最新末端位移就是 15。如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用 Latest 策略。 Current 策略表示将位移调整成消费者当前提交的最新位移。有时候你可能会碰到这样的场景:你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current 策略就可以帮你实现这个功能。 表中第 4 行的 Specified-Offset 策略则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地 “跳过” 此消息的处理。在实际使用过程中,可能会出现 corrupted 消息无法被消费的情形,此时消费者程序会抛出异常,无法继续工作。一旦碰到这个问题,你就可以尝试使用 Specified-Offset 策略来规避。 如果说 Specified-Offset 策略要求你指定位移的绝对数值的话,那么 Shift-By-N 策略指定的就是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的 “跳” 是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定 N 为 -100。 ...

2022年4月17日 · 3 分钟

如何调优 Kafka

调优目标 在做调优之前,我们必须明确优化 Kafka 的目标是什么。通常来说,调优是为了满足系统常见的非功能性需求。在众多的非功能性需求中,性能绝对是我们最关心的那一个。不同的系统对性能有不同的诉求,比如对于数据库用户而言,性能意味着请求的响应时间,用户总是希望查询或更新请求能够被更快地处理完并返回。 对 Kafka 而言,性能一般是指吞吐量和延时。 吞吐量,也就是 TPS,是指 Broker 端进程或 Client 端应用程序每秒能处理的字节数或消息数,这个值自然是越大越好。 延时和我们刚才说的响应时间类似,它表示从 Producer 端发送消息到 Broker 端持久化完成之间的时间间隔。这个指标也可以代表端到端的延时(End-to-End,E2E),也就是从 Producer 发送消息到 Consumer 成功消费该消息的总时长。和 TPS 相反,我们通常希望延时越短越好。 总之,高吞吐量、低延时是我们调优 Kafka 集群的主要目标,一会儿我们会详细讨论如何达成这些目标。在此之前,我想先谈一谈优化漏斗的问题。 优化漏斗 优化漏斗是一个调优过程中的分层漏斗,我们可以在每一层上执行相应的优化调整。总体来说,层级越靠上,其调优的效果越明显,整体优化效果是自上而下衰减的,如下图所示: 第 1 层:应用程序层。它是指优化 Kafka 客户端应用程序代码。比如,使用合理的数据结构、缓存计算开销大的运算结果,抑或是复用构造成本高的对象实例等。这一层的优化效果最为明显,通常也是比较简单的。 第 2 层:框架层。它指的是合理设置 Kafka 集群的各种参数。毕竟,直接修改 Kafka 源码进行调优并不容易,但根据实际场景恰当地配置关键参数的值,还是很容易实现的。 第 3 层:JVM 层。Kafka Broker 进程是普通的 JVM 进程,各种对 JVM 的优化在这里也是适用的。优化这一层的效果虽然比不上前两层,但有时也能带来巨大的改善效果。 第 4 层:操作系统层。对操作系统层的优化很重要,但效果往往不如想象得那么好。与应用程序层的优化效果相比,它是有很大差距的。 基础性调优 接下来,我就来分别介绍一下优化漏斗的 4 个分层的调优。 操作系统调优 我先来说说操作系统层的调优。在操作系统层面,你最好在挂载(Mount)文件系统时禁掉 atime 更新。atime 的全称是 access time,记录的是文件最后被访问的时间。记录 atime 需要操作系统访问 inode 资源,而禁掉 atime 可以避免 inode 访问时间的写入操作,减少文件系统的写操作数。你可以执行 mount -o noatime 命令进行设置。 ...

2022年4月3日 · 3 分钟