hive映射es

一 hive导入es 1 创建hive-es映射表 CREATE EXTERNAL TABLE hive_es.re_run_test2( id STRING ,test STRING) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 're_run_test2/test', 'es.nodes'='172.16.98.113,172.16.98.149,172.16.98.150,172.16.98.151,172.16.98.152', 'es.port'='9200', 'es.mapping.id' ='id') 注: 1. es.resource对应es中的index/type 2. 1.es.mapping.names为hive和es字段名映射关系。 2.如果hive表和es表字段名完全一致,可以省略此参数。 3.hive中字段名不区分大小写,元数据寸的全是小写;es中字段大小写敏感,如果es中字段名出现大写,需认真填写。 4.es中_id为自动生成,如若需要覆盖,需加参数'es.mapping.id'='id' 2 先导入es映射表相关jar包 add jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hive/auxlib/elasticsearch-hadoop-6.3.0.jar; add jar /data/jar/httpclient-4.5.5.jar; add jar /data/jar/org.apache.commons.httpclient.jar; 3 向映射表insert数据 二 es导入hive 1 建hive映射表 CREATE EXTERNAL TABLE hive_es.cty_test5( addTime string ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'cty_test/cty_test', 'es.nodes'='172.16.98.113,172.16.98.149,172.16.98.150,172.16.98.151,172.16.98.152', 'es.port'='9200', 'es.mapping.names'= 'addTime:addTime', 'es.mapping.date.rich'='false', 'es.index.auto.create'='false', ) 注意,hive表数据类型要和es一致,除了es的date要转成hive的string,同时要加参数’es.mapping.date.rich’=‘false’,否则查询会报错. 2 通过映射表向其他表insert 参考: https://www.cnblogs.com/koushr/p/9505435.html

2022年10月16日 · 1 分钟

flume示例

conf: # ex_trade.conf:外贸数据自kafka接入hdfs # 配置负责人:褚天宇 # 配置时间: 2022-01-17 # 配置Agent ex_trade_agent各个组件的名称 ex_trade_agent.sources = r1 ex_trade_agent.sinks = k1 ex_trade_agent.channels = c1 # 配置Agent ex_trade_agent的source r1的属性 # 设置kafka源 ex_trade_agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource # 设置kafka消费者组id (不能改动!改动会丢失已记录的offset) ex_trade_agent.sources.r1.kafka.consumer.group.id = ex_trade_flume ex_trade_agent.sources.r1.kafka.bootstrap.servers = 192.168.102.2:9092,192.168.102.3:9092,192.168.102.7:9092 ex_trade_agent.sources.r1.kafka.topics = ex_trade # 一批写入 channel 的最大消息数 ex_trade_agent.sources.r1.batchSize = 2000 # 一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作 ex_trade_agent.sources.r1.batchDurationMillis = 2000 # 设置kafka-offset策略为从头消费 ex_trade_agent.sources.r1.kafka.consumer.auto.offset.reset = earliest # 配置Agent ex_trade_agent的sink k1的属性 # 配置hdfs目标 ex_trade_agent.sinks.k1.type = hdfs # 配置hdfs写入目录,每天一个文件夹,为之后hive表按日分区做准备 ex_trade_agent.sinks.k1.hdfs.path = /flume/ex_trade/date=%Y%m%d/ # 配置文件前缀 ex_trade_agent.sinks.k1.hdfs.filePrefix = ex_trade_ # 配置数据类型为原样输出 ex_trade_agent.sinks.k1.hdfs.fileType = DataStream # 当前文件写入达到该值时间后触发滚动创建新文件,单位:秒,设置为1天防止产生小文件 ex_trade_agent.sinks.k1.hdfs.rollInterval = 86400 # 当前文件写入达到该大小后触发滚动创建新文件,单位:字节,设置为128M ex_trade_agent.sinks.k1.hdfs.rollSize = 134217700 # 向 HDFS 写入内容时每次批量操作的 Event 数量 ex_trade_agent.sinks.k1.hdfs.batchSize = 2000 # 不根据 Event 数量来分割文件 ex_trade_agent.sinks.k1.hdfs.rollCount = 0 # 文件写入格式设置为 Text,否则 Impala或 Apache Hive 无法读取这些文件。 ex_trade_agent.sinks.k1.hdfs.writeFormat = Text # 配置Agent ex_trade_agent的channel c1的属性,用来缓冲数据 ex_trade_agent.channels.c1.type = file # 配置通道的检查点目录 ex_trade_agent.channels.c1.checkpointDir = /data/deploy/flume/checkpoint # 配置通道的数据目录 ex_trade_agent.channels.c1.dataDirs = /data/deploy/flume/data # 把source和sink绑定到channel上 ex_trade_agent.sources.r1.channels = c1 ex_trade_agent.sinks.k1.channel = c1 启动脚本: ...

2022年6月26日 · 2 分钟