PostgreSQL同步HDFS

1.使用datax同步数据 2.模板json(已配置hdfs ha): { "job": { "content": [ { "reader": { "name": "postgresqlreader", "parameter": { "username": "hs_sync", "password": "Pass2025", "column": [ "order_date", "day", "iso_day_of_week", "weekday_cn", "weekday_en", "weekday_short", "is_weekend", "iso_week", "month", "month_cn", "month_en_full", "month_en_short", "quarter", "year" ], "connection": [ { "table": [ "dim_calendar" ], "jdbcUrl": [ "jdbc:postgresql://100.64.0.10:25432/hs_sync_data" ] } ], "fetchSize": 1000 } }, "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://nameservice1", "hadoopConfig": { "dfs.nameservices": "nameservice1", "dfs.ha.namenodes.nameservice1": "nn1,nn2", "dfs.namenode.rpc-address.nameservice1.nn1": "192.168.33.61:8020", "dfs.namenode.rpc-address.nameservice1.nn2": "192.168.33.62:8020", "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" }, "fileType": "orc", "path": "/tmp/data/test/", "fileName": "dim_calendar", "writeMode": "truncate", "column": [ { "name": "order_date", "type": "date" }, { "name": "day", "type": "smallint" }, { "name": "iso_day_of_week", "type": "smallint" }, { "name": "weekday_cn", "type": "string" }, { "name": "weekday_en", "type": "string" }, { "name": "weekday_short", "type": "string" }, { "name": "is_weekend", "type": "boolean" }, { "name": "iso_week", "type": "smallint" }, { "name": "month", "type": "smallint" }, { "name": "month_cn", "type": "string" }, { "name": "month_en_full", "type": "string" }, { "name": "month_en_short", "type": "string" }, { "name": "quarter", "type": "string" }, { "name": "year", "type": "smallint" } ], "fieldDelimiter": "\t", "maxFileSize": 134217728, "encoding": "UTF-8" } } } ], "setting": { "speed": { "channel": 5 }, "errorLimit": { "record": 0, "percentage": 0.02 }, "retry": { "limit": 3, "interval": 5000 } } } }

2026年2月12日 · 1 分钟

CDH6.3.2集群部署

使用ansible做自动化部署 CDH 集群安装指南 环境准备 项目 值 NFS 服务器 xxx.xxx.xxx.xxx:/nfs/share Inventory inventory/cdh-init.ini 安装步骤 步骤 1:配置 /etc/hosts 和挂载 NFS ansible-playbook cdh-hosts-nfs.yml -i inventory/cdh-init.ini 步骤 2:系统初始化 ansible-playbook cdh-init-raw.yaml -i inventory/cdh-init.ini 验证 # 验证 NFS 挂载 ansible cdh_all -i inventory/cdh-init.ini -m shell -a "ls /mnt" -b # 验证 Java ansible cdh_all -i inventory/cdh-init.ini -m shell -a "java -version" # 验证 SELinux ansible cdh_all -i inventory/cdh-init.ini -m shell -a "sestatus" # 验证 haveged ansible cdh_all -i inventory/cdh-init.ini -m shell -a "systemctl status haveged" 安装介质目录 /mnt/ ├── cdh/ # CDH Parcel ├── ClouderaManager/ # CM 6.3.1 └── mysql/ # MySQL 5.7 RPM

2026年1月30日 · 1 分钟

Ranger配置大数据组件

1.开启ranger-hive ranger-hdfs插件 2.修改hdfs配置 1.开启hdfs认证 hadoop.security.authorization = true

2023年4月2日 · 1 分钟

Kafka数据导入ClickHouse

1.Kafka中数据导入ClickHouse的标准流程 在ClickHouse中建立Kafka Engine 外表,作为Kafka数据源的一个接口 在ClickHouse中创建普通表(通常是MergeTree系列)存储Kafka中的数据 在ClickHouse中创建Materialized View, 监听Kafka中的数据,并将数据写入ClickHouse存储表中 上述三个步骤,就可以将Kafka中的数据导入到ClickHouse集群中。 2. Kafka数据导入ClickHouse详细步骤 ClickHouse 提供了Kafka Engine 作为访问Kafka集群的一个接口(数据流)。有了这个接口后,导入数据就很方便了,具体步骤如下: 步骤1:创建Kafka Engine CREATE TABLE source ( `ts` DateTime, `tag` String, `message` String ) ENGINE = Kafka() SETTINGS kafka_broker_list = '172.19.0.47:9092', kafka_topic_list = 'tag', kafka_group_name = 'clickhouse', kafka_format = 'JSONEachRow', kafka_skip_broken_messages = 1, kafka_num_consumers = 2 必选参数: kafkabrokerlist: 这里填写Kafka服务的broker列表,用逗号分隔 kafkatopiclist: 这里填写Kafka topic,多个topic用逗号分隔 kafkagroupname:这里填写消费者group名称 kafkaformat__:Kafka数据格式, ClickHouse支持的Format, 详见这里 可选参数: kafkaskipbrokenmessages:填写大于等于0的整数,表示忽略解析异常的Kafka数据的条数。如果出现了N条异常后,后台线程结束,Materialized View会被重新安排后台线程去监听数据 kafkanumconsumers_: 单个Kafka Engine 的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应topic的partitions总数 kafkarowdelimiter: 消息分隔符 kafkaschema__:对于kafkaformat需要schema定义的时候,其schema由该参数确定 kafkamaxblocksize: 该参数控制Kafka数据写入目标表的Block大小,超过该数值后,就将数据刷盘。 步骤2:创建存储Kafka数据的目标表,该表就是最终存储Kafka数据 本文中,采用MergeTree来存储Kafka数据: CREATE TABLE target ( `ts` DateTime, `tag` String ) ENGINE = MergeTree() PARTITION BY toYYYYMM(ts) ORDER BY tag 步骤3:创建Metrialized View 抓取数据 本文中,采用如下语句创建MV: ...

2023年1月8日 · 1 分钟

如何重设消费者组位移

为什么要重设消费者组位移? 我们知道,Kafka 和传统的消息引擎在设计上是有很大区别的,其中一个比较显著的区别就是,Kafka 的消费者读取消息是可以重演的(replayable)。 像 RabbitMQ 或 ActiveMQ 这样的传统消息中间件,它们处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功处理,就会被从 Broker 上删除。 反观 Kafka,由于它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,是只读的操作,因此消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能。 对了,之前有很多同学在专栏的留言区提问:在实际使用场景中,我该如何确定是使用传统的消息中间件,还是使用 Kafka 呢?我在这里统一回答一下。如果在你的场景中,消息处理逻辑非常复杂,处理代价很高,同时你又不关心消息之间的顺序,那么传统的消息中间件是比较合适的;反之,如果你的场景需要较高的吞吐量,但每条消息的处理时间很短,同时你又很在意消息的顺序,此时,Kafka 就是你的首选。 重设位移策略 不论是哪种设置方式,重设位移大致可以从两个维度来进行。 位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给定的位移值。 时间维度。我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前的位移值。 下面的这张表格罗列了 7 种重设策略。接下来,我来详细解释下这些策略。 Earliest 策略表示将位移调整到主题当前最早位移处。这个最早位移不一定就是 0,因为在生产环境中,很久远的消息会被 Kafka 自动删除,所以当前最早位移很可能是一个大于 0 的值。如果你想要重新消费主题的所有消息,那么可以使用 Earliest 策略。 Latest 策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了 15 条消息,那么最新末端位移就是 15。如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用 Latest 策略。 Current 策略表示将位移调整成消费者当前提交的最新位移。有时候你可能会碰到这样的场景:你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current 策略就可以帮你实现这个功能。 表中第 4 行的 Specified-Offset 策略则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地 “跳过” 此消息的处理。在实际使用过程中,可能会出现 corrupted 消息无法被消费的情形,此时消费者程序会抛出异常,无法继续工作。一旦碰到这个问题,你就可以尝试使用 Specified-Offset 策略来规避。 如果说 Specified-Offset 策略要求你指定位移的绝对数值的话,那么 Shift-By-N 策略指定的就是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的 “跳” 是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定 N 为 -100。 ...

2022年4月17日 · 3 分钟