/pysqoop

python 封装 pysqoop

Primary LanguagePython

使用方法

环境

  • python3 pymysql/retrying
  • sqoop-cdh
  • mysql 建表语句 dmp_data_source
CREATE TABLE `dmp_data_source` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `db_name` varchar(50) NOT NULL COMMENT '数据库名称',
  `type` varchar(10) NOT NULL COMMENT '读写类别read/write',
  `connect` varchar(500) NOT NULL COMMENT 'jdbc地址',
  `user_name` varchar(100) NOT NULL COMMENT '数据库用户',
  `passwd` varchar(200) NOT NULL,
  `db_type` varchar(30) NOT NULL,
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `dmp_data_source_id_uindex` (`id`),
  UNIQUE KEY `uqk_db` (`db_name`,`type`,`db_type`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='数据源配置信息'

mysql2hive

mysql-full
dstDb_Table=oyo_tmp.hera_job_2221
mapper_nums=1
srcDb_Table=rdfa_datastudio.hera_job

res_query="select * ,now() as etl_time from  rdfa_datastudio.hera_job WHERE \$CONDITIONS "
python3 app.py  mysql-full \
${srcDb_Table} \
${dstDb_Table} \
"${res_query}" \
${mapper_nums}
mysql-incre
dstDb_Table=oyo_tmp.hera_job_15
mapper_nums=1
srcDb_Table=hera.hera_job

res_query="
select  id,configs,gmt_modified,now() as etl_time
from hera.hera_job
WHERE \$CONDITIONS 
and gmt_modified>=\"${zdt.addDay(-1).add(11,-1).format("yyyy-MM-dd HH:mm:ss")}\" "


python3 /u01/hera/app/py_sqoop/app.py  mysql-incre \
${srcDb_Table} \
${dstDb_Table} \
"${res_query}" \
${mapper_nums}
mysql-incre-par
dstDb_Table=oyo_tmp.hera_job_di
mapper_nums=1
srcDb_Table=hera.hera_job



res_query="
select  id,configs,now() as etl_time
from hera.hera_job
WHERE \$CONDITIONS 
and gmt_modified>=\"${zdt.addDay(-1).add(11,-1).format("yyyy-MM-dd HH:mm:ss")}\" "


partition_key=dt
partition_value=${zdt.addDay(-1).format("yyyy-MM-dd")}

python3 /u01/hera/app/py_sqoop/app.py  mysql-incre-par ${srcDb_Table} \
${dstDb_Table} \
"${res_query}" \
${mapper_nums} \
${partition_key} \
${partition_value}

hive2mysql:

删除后写入
dstDb_Table=hera.lihm_20190812
dstDb_Table_columns=member_id,perfer_citys,live_city,base_city,sync_time
mapper_nums=2
srcDb_Table=dw.dw_datacenter_new_member_dim_dw

res_query="select member_id,perfer_citys,live_city,base_city,sync_time from dw.dw_datacenter_new_member_dim_dw limit 1000 "

pre_sql="DELETE FROM hera.lihm_20190812 WHERE id < 10"

python3 app.py load-mysql \
${srcDb_Table} \
${dstDb_Table} \
${dstDb_Table_columns} \
"${res_query}" \
${mapper_nums} \
"${pre_sql}"
upsert 更新并插入
dstDb_Table=hera.hera_job_lihm
mapper_nums=1
srcDb_Table=oyo_ods.ods_hera_hera_job
update_key=id

res_query="select id
,auto
,'lihm03' as name 
from oyo_ods.ods_hera_hera_job "

python3 app.py load-mysql-upsert \
${srcDb_Table} \
${dstDb_Table} \
${update_key} \
"${res_query}" \
${mapper_nums}