注:前提不泄露公司信息
1.维表事实表设计
2.源表
2.1 数据导入
connect="xxx"
username="xxx" password="xxxx" ###################################################################################### ########customer--updated_at每小时更新-->default.customer sqoop import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table customer \ --hive-import \ --hive-table default.customer ####创建作业,每天更新捕获变化数据 sqoop job --create customer_add_job_zw -- import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table customer \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --hive-drop-import-delims \ --hive-table default.customer \ --incremental lastmodified \ --check-column updated_at \ --last-value '2015-01-01 00:00:00' \ -m 1 \ --merge-key id ######################################################################################### #### customer_coupon---> updated_at每小时 --->default.customer_coupon sqoop import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table customer_coupon \ --hive-import \ --hive-table default.customer_coupon####创建作业,每天更新捕获变化数据
sqoop job --create customer_coupon_add_job_zw -- import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table customer_coupon \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --hive-drop-import-delims \ --hive-table default.customer_coupon \ --incremental lastmodified \ --check-column updated_at \ --last-value '2015-01-01 00:00:00' \ -m 1 \ --merge-key id ############################################################################################## ## ## order_payment---> created_at每小时 ---> deault.order_payment sqoop import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table order_payment \ --hive-import \ --hive-table default.order_payment####创建作业,每天更新捕获变化数据
sqoop job --create order_payment_add_job_zw -- import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table order_payment \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --hive-drop-import-delims \ --hive-table default.order_payment \ --incremental lastmodified \ --check-column created_at \ --last-value '2015-01-01 00:00:00' \ -m 1 \ --merge-key id ################################################################################################# #### taxi_order_payment---> created_at每小时 ---> default.taxi_order_payment sqoop import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table taxi_order_payment \ --hive-import \ --hive-table default.taxi_order_payment####创建作业,每天更新捕获变化数据
sqoop job --create taxi_order_payment_add_job_zw -- import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table taxi_order_payment \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --hive-drop-import-delims \ --hive-table default.taxi_order_payment \ --incremental lastmodified \ --check-column created_at \ --last-value '2015-01-01 00:00:00' \ -m 1 \ --merge-key id############################################################################################
#### ticket_payment---> updated_at每小时 ---> default.ticket_payment sqoop import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table ticket_payment \ --hive-import \ --hive-table default.ticket_payment####创建作业,每天更新捕获变化数据
sqoop job --create ticket_payment_add_job_zw -- import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table ticket_payment \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --hive-drop-import-delims \ --hive-table default.ticket_payment \ --incremental lastmodified \ --check-column updated_at \ --last-value '2015-01-01 00:00:00' \ -m 1 \ --merge-key id###########################################################################################
#### buspool_payment--> updated_at每小时 ---> default.buspool_payment sqoop import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table buspool_payment \ --hive-import \ --hive-table default.buspool_payment####创建作业,每天更新捕获变化数据
sqoop job --create buspool_payment_add_job_zw -- import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table buspool_payment \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --hive-drop-import-delims \ --hive-table default.buspool_payment \ --incremental lastmodified \ --check-column updated_at \ --last-value '2015-01-01 00:00:00' \ -m 1 \ --merge-key id##############################################################################################
#### coupon--->created_at 每天 --->default.coupon sqoop import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table coupon \ --hive-import \ --hive-table default.coupon####创建作业,每天更新捕获变化数据
sqoop job --create coupon_add_job_zw -- import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table coupon \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --hive-drop-import-delims \ --hive-table default.coupon \ --incremental lastmodified \ --check-column created_at \ --last-value '2015-01-01 00:00:00' \ -m 1 \ --merge-key id ################################################################################################## ### giftpack---> created_at 每天 --->default.giftpack sqoop import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table giftpack \ --hive-import \ --hive-table default.giftpack####创建作业,每天更新捕获变化数据
sqoop job --create giftpack_add_job_zw -- import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table giftpack \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --hive-drop-import-delims \ --hive-table default.giftpack \ --incremental lastmodified \ --check-column created_at \ --last-value '2015-01-01 00:00:00' \ -m 1 \ --merge-key id #################################################################################################### #### action_center---> created_at 每天 --->default.action_centersqoop import \
--connect ${connect} \ --username ${username} \ --password ${password} \ --table action_center \ --hive-import \ --hive-table default.action_center####创建作业,每天更新捕获变化数据
sqoop job --create action_center_add_job_zw -- import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table action_center \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --hive-drop-import-delims \ --hive-table default.action_center \ --incremental lastmodified \ --check-column created_at \ --last-value '2015-01-01 00:00:00' \ -m 1 \ --merge-key id ################################################################################################# #### bus_coupon---> updatedAt 每天 --->default.bus_coupon sqoop import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table bus_coupon \ --hive-import \ --hive-table default.bus_coupon####创建作业,每天更新捕获变化数据
sqoop job --create bus_coupon_add_job_zw -- import \ --connect ${connect} \ --username ${username} \ --password ${password} \ --table bus_coupon \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --hive-drop-import-delims \ --hive-table default.bus_coupon \ --incremental lastmodified \ --check-column updatedAt \ --last-value '2015-01-01 00:00:00' \ -m 1 \ --merge-key id3.维度表和事实表
3.1 数据装载
#############################################################################################
dw维表/事实表表结构创建 ############################################################################################# #####dim_action create table default.dim_action ( id int, city string , title string, deleted_at string , created_at string comment '创建时间', updated_at string comment '更新时间', remark string ) row format delimited fields terminated by '\t' stored as textfile; INSERT overwrite table default.dim_action select id, title, city, deleted_at, created_at, updated_at, remark from default.action_center; #####dim_coupon create table default.dim_coupon ( id int , name string , city string , max bigint, type int , value double , duration int, enable int , created_at string , updated_at string, minCost double, maxDiscount double , expired_at string, scope int , isPool int, minPay int , purpose int , remark string ) row format delimited fields terminated by '\t' stored as textfile;insert overwrite table default.dim_coupon
select id, name, city, max, type, value, duration, enable, created_at, updated_at, minCost, maxDiscount, expired_at, scope, isPool, minPay, purpose, remark from default.coupon;############dim_customer
create table default.dim_customer ( id int,name string,phone int,email string,password int,created_at string,updated_at string,balance int,rating double,rating_total double,rating_count int,city string,alipay_user_id int ) row format delimited fields terminated by '\t' stored as textfile;INSERT overwrite table default.dim_customer
select id, name, phone, email, password, created_at, updated_at, balance, rating, rating_total, rating_count, city, alipay_user_id from default.customer; ##############dim_giftpack create table default.dim_giftpack ( id int , city string , name string, max bigint, enable int , type string, expired_at string, created_at string, effected_at string , updated_at string , way int, recharge_base bigint, recharge_type int , remark string ) row format delimited fields terminated by '\t' stored as textfile;insert overwrite table default.dim_giftpack
select id, city, name, max, enable, type, expired_at, created_at, effected_at, updated_at, way, recharge_base, recharge_type, remark from default.giftpack; ############fct_order_payment_deduction create table fct_order_payment_deduction ( coupon_id bigint comment 'id', price double, created_at string , status int ) row format delimited fields terminated by '\t' stored as textfile;INSERT overwrite table fct_order_payment_deduction
select r.coupon_id, r.price, r.created_at, r.status from ( SELECT p.info coupon_id,p.price,p.created_at,1 status from default.order_payment p where p.type='coupon' union all SELECT q.info coupon_id,q.price,q.created_at,2 status from default.taxi_order_payment q where q.type='coupon' union all SELECT m.info coupon_id,m.price,m.created_at,3 status from default.ticket_payment m where m.type='coupon' union all SELECT n.info coupon_id,n.price,n.created_at,4 status from default.buspool_payment n where n.type='coupon' ) r ; ################################# create table fct_customer_coupon ( id bigint comment '唯一主键', customer_id bigint , coupon_id bigint , giftpack_id bigint , city string, source string, type int , status int, expire_at string , created_at string, updated_at string , value double , source_type string , source_id bigint , ispool int , action_id bigint, balance_detail_id bigint , remark string, ) row format delimited fields terminated by '\t' stored as textfile;INSERT overwrite table fct_customer_coupon
select x.id, x.customer_id, x.coupon_id, x.giftpack_id, x.city, x.source, x.type, x.status, x.expire_at, x.created_at, x.updated_at, x.value, x.source_type, x.source_id, x.ispool, x.action_id, x.balance_detail_id, x.remark from default.customer_coupon x ;4.数据清洗到报表层
#############################################################################################
# default层数据通过HQL洗到rpt层 ############################################################################################# ###########################rpt_customer_coupon_details drop table if exists default.rpt_customer_coupon_details;CREATE TABLE default.rpt_customer_coupon_details (
`id` int, `date_id` string , `cname` string, `cphone` string, `ccity` string, `couponid` int, `couponname` string, `accity` string, `ctype` int, `couponvalue` double , `duration` string , `pagename` string , `expired_at` string , `status` int , `minCost` double , `maxDiscount` double , `minPay` double , `created_at` string , `updated_at` string, `source` string , `actionname` string , `carprice` double, `taxiprice` double, `ticketprice` double, `buspoolprice` double ) comment 'rpt_customer_coupon_details' row format delimited fields terminated by '\t' stored as textfile ;insert into default.rpt_customer_coupon_details
select a.id, to_date(a.created_at) date_id, b.`name` cname, b.phone cphone, a.city ccity, a.coupon_id couponid, c.`name` couponname, ac.city accity, a.type AS ctype , a.`value` couponvalue, c.duration, g.`name` pagename, a.expire_at expired_at, a.status, c.minCost, c.maxDiscount, c.minPay, a.created_at, a.updated_at, a.source, ac.title actionname, nvl((case when r.status=1 then nvl(price,0) else 0 end),0) carprice, nvl((case when r.status=2 then nvl(price,0) else 0 end),0) taxiprice, nvl((case when r.status=3 then nvl(price,0) else 0 end),0) ticketprice, nvl((case when r.status=4 then nvl(price,0) else 0 end),0) buspoolprice from (select x.id, x.customer_id, x.coupon_id, x.giftpack_id, x.action_id, x.source, x.type, x.source_type, x.source_id, x.status, x.created_at, x.updated_at, x.city, x.value, x.remark, x.expire_at from default.fct_customer_coupon x ) a LEFT JOIN default.dim_customer b on a.customer_id= b.id LEFT JOIN default.dim_coupon c on a.coupon_id=c.id LEFT JOIN default.dim_giftpack g on a.giftpack_id=g.id LEFT JOIN default.dim_action ac on a.action_id=ac.id LEFT JOIN ( SELECT p.coupon_id, p.price, p.status from default.fct_order_payment_deduction p ) r on a.id=r.coupon_id;#############################rpt_customer_coupon_action_statistic
drop table if exists default.rpt_customer_coupon_action_statistic;create table default.rpt_customer_coupon_action_statistic
( `date_id` string , `action_id` int , `title` string, `city` string, `customer_cnt` int, `total_cnt` int, `usecnt` int , `unusecnt` int, `outcnt` int, `ocnt` int, `qcnt` int , `pcnt` int, `rcnt` int, `oprice` double, `qprice` double, `pprice` double, `rprice` double ) comment 'rpt_customer_coupon_action_statistic' row format delimited fields terminated by '\t' stored as textfile;insert into default.rpt_customer_coupon_action_statistic
select to_date(x.created_at) date_id, x.action_id, x.title, x.city, count(DISTINCT x.customer_id) customer_cnt, count(x.id) total_cnt, count(case when x.cstatus=1 then x.id else null end) usecnt, count(case when x.cstatus=0 then x.id else null end) unusecnt, count(case when x.cstatus=-1 then x.id else null end) outcnt, count(case when x.oprice>0 then x.id else null end) ocnt, count(case when x.qprice>0 then x.id else null end) qcnt, count(case when x.pprice>0 then x.id else null end) pcnt, count(case when x.rprice>0 then x.id else null end) rcnt, sum(x.oprice) oprice, sum(x.qprice) qprice, sum(x.pprice) pprice, sum(x.rprice) rprice FROM ( SELECT a.id, a.created_at, a.action_id, da.city, da.title, a.customer_id, a.`status` cstatus, (case when fr.status=1 then nvl(price,0) else 0 end) oprice, (case when fr.status=2 then nvl(price,0) else 0 end) pprice, (case when fr.status=3 then nvl(price,0) else 0 end) qprice, (case when fr.status=4 then nvl(price,0) else 0 end) rprice FROM default.fct_customer_coupon a LEFT JOIN default.dim_action da on a.action_id=da.id LEFT JOIN ( SELECT coupon_id, price, status from default.fct_order_payment_deduction ) fr on a.id=fr.coupon_id ) x GROUP BY to_date(x.created_at),x.action_id,x.city,x.title ;#########################
drop table if exists default.rpt_customer_coupon_source_statistic;create table default.rpt_customer_coupon_source_statistic
( `date_id` string, `source` string , `customer_cnt` int, `total_cnt` int, `usecnt` int, `unusecnt` int, `outcnt` int , `ocnt` int, `qcnt` int, `pcnt` int, `rcnt` int, `oprice` double, `qprice` double, `pprice` double, `rprice` double ) comment 'rpt_customer_coupon_source_statistic' row format delimited fields terminated by '\t' stored as textfile;insert into default.rpt_customer_coupon_source_statistic
SELECT to_date(x.created_at) date_id, x.source source, count(DISTINCT x.customer_id) customer_cnt, count(x.id) total_cnt, count(case when x.cstatus=1 then x.id else null end) usecnt, count(case when x.cstatus=0 then x.id else null end) unusecnt, count(case when x.cstatus=-1 then x.id else null end) outcnt, count(case when x.oprice>0 then x.id else null end) ocnt, count(case when x.qprice>0 then x.id else null end) qcnt, count(case when x.pprice>0 then x.id else null end) pcnt, count(case when x.rprice>0 then x.id else null end) rcnt, sum(x.oprice) oprice, sum(x.qprice) qprice, sum(x.pprice) pprice, sum(x.rprice) rprice FROM ( SELECT a.id, a.created_at, (case when a.source='complete' then 'xxxx' when a.source='feedback' then 'xxx' when a.source='first' then 'xxx' when a.source='invite' then 'xxx' when a.source='keyword' then 'xx' when a.source='new' then 'xx' when a.source='recharge' then 'xx' when a.source='share' then 'xx' else 'xxx' end) source, a.customer_id, a.`status` cstatus, (case when r.status=1 then nvl(price,0) else 0 end) oprice, (case when r.status=2 then nvl(price,0) else 0 end) pprice, (case when r.status=3 then nvl(price,0) else 0 end) qprice, (case when r.status=4 then nvl(price,0) else 0 end) rprice FROM default.fct_customer_coupon a LEFT JOIN ( SELECT p.coupon_id, p.price, p.status from default.fct_order_payment_deduction p ) r on a.id=r.coupon_id ) x GROUP BY to_date(x.created_at),x.source ;5.报表层数据输出到mysql
mysqlconnect="xxx"
username="xxx" password="xxxx" hdfsconnect="xxxx" ## 导出rpt_customer_coupon_details数据 sqoop eval \ --connect ${mysqlconnect} \ --username ${username} \ --password ${password} \ --query "truncate table rpt_customer_coupon_details" sqoop export \ --connect ${mysqlconnect} \ --username ${username} \ --password ${password} \ --table rpt_customer_coupon_details \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --input-null-non-string '\\N' \ --input-null-string '\\N' \ --update-key id \ --update-mode updateonly \ --export-dir '${hdfscon1}' \ --columns="id,date_id,cname,cphone,ccity,couponid,couponname,accity,ctype,couponvalue,duration,pagename,expired_at,status,mincost,maxdiscount,minpay,created_at,updated_at,source,actionname,carprice,taxiprice,ticketprice,buspoolprice" \ -m 4 \ -direct###### 导出rpt_customer_coupon_action_statistic数据
sqoop eval \ --connect ${mysqlconnect} \ --username ${username} \ --password ${password} \ --query "truncate table rpt_customer_coupon_action_statistic"sqoop export \
--connect ${mysqlconnect} \ --username ${username} \ --password ${password} \ --table rpt_customer_coupon_action_statistic \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --input-fields-terminated-by "\t" \ --input-lines-terminated-by "\n" \ --input-null-non-string '\\N' \ --input-null-string '\\N' \ --export-dir ${hdfscon2} \ --columns="date_id,action_id,title,city,customer_cnt,total_cnt,usecnt,unusecnt,outcnt,ocnt,qcnt,pcnt,rcnt,oprice,qprice,pprice,rprice" \ -m 1 ## rpt_customer_coupon_source_statistic sqoop eval \ --connect ${mysqlconnect} \ --username ${username} \ --password ${password} \ --query "truncate table rpt_customer_coupon_source_statistic"sqoop export \
--connect ${mysqlconnect} \ --username ${username} \ --password ${password} \ --table rpt_customer_coupon_source_statistic \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --input-fields-terminated-by "\t" \ --input-lines-terminated-by "\n" \ --input-null-non-string '\\N' \ --input-null-string '\\N' \ --export-dir '${hdfscon3}' \ --columns="date_id,source,customer_cnt,total_cnt,usecnt,unusecnt,outcnt,ocnt,qcnt,pcnt,rcnt,oprice,qprice,pprice,rprice" \ -m 1 ############################################################################################# # hive表天分区 ############################################################################################# select max(to_date(updated_at)),min(to_date(updated_at)) from rpt_customer_coupon_details limit 10; select distinct(to_date(updated_at)) from rpt_customer_coupon_details; select distinct(substr(to_date(updated_at),6,2)) from rpt_customer_coupon_details; drop table if exists default.rpt_customer_coupon_details_partition;CREATE TABLE default.rpt_customer_coupon_details_partition (
`id` int, `cname` string, `cphone` string, `ccity` string, `couponid` int, `couponname` string, `accity` string, `ctype` int, `couponvalue` double , `duration` string , `pagename` string , `expired_at` string , `status` int , `minCost` double , `maxDiscount` double , `minPay` double , `created_at` string , `updated_at` string, `source` string , `actionname` string , `carprice` double, `taxiprice` double, `ticketprice` double, `buspoolprice` double ) comment 'rpt_customer_coupon_details' partitioned by (year string,month string,`date_id` string) row format delimited fields terminated by '\t' stored as textfile;########设置动态分区
set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; SET hive.exec.max.dynamic.partitions.pernode=10000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=150000; set dfs.datanode.max.xcievers=8192;######向分区表插入数据
insert into default.rpt_customer_coupon_details_partition partition (year,month,day) select a.id, b.`name` cname, b.phone cphone, a.city ccity, a.coupon_id couponid, c.`name` couponname, ac.city accity, a.type AS ctype , a.`value` couponvalue, c.duration, g.`name` pagename, a.expire_at expired_at, a.status, c.minCost, c.maxDiscount, c.minPay, a.created_at, a.updated_at, a.source, ac.title actionname, nvl((case when r.status=1 then nvl(price,0) else 0 end),0) carprice, nvl((case when r.status=2 then nvl(price,0) else 0 end),0) taxiprice, nvl((case when r.status=3 then nvl(price,0) else 0 end),0) ticketprice, nvl((case when r.status=4 then nvl(price,0) else 0 end),0) buspoolprice, substr(to_date(a.created_at),1,4) year, substr(to_date(a.created_at),6,2) month, to_date(a.created_at) date_id from (select x.id, x.customer_id, x.coupon_id, x.giftpack_id, x.action_id, x.source, x.type, x.source_type, x.source_id, x.status, x.created_at, x.updated_at, x.city, x.value, x.remark, x.expire_at from default.fct_customer_coupon x ) a LEFT JOIN default.dim_customer b on a.customer_id= b.id LEFT JOIN default.dim_coupon c on a.coupon_id=c.id LEFT JOIN default.dim_giftpack g on a.giftpack_id=g.id LEFT JOIN default.dim_action ac on a.action_id=ac.id LEFT JOIN ( SELECT p.coupon_id, p.price, p.status from default.fct_order_payment_deduction p ) r on a.id=r.coupon_id ;#############导出-必须指定到hive表下面的文件,如果是路径的话就会报错
sqoop export \ --connect ${mysqlconnect} \ --username ${username} \ --password ${password} \ --table rpt_customer_coupon_details_partition_year \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --input-null-non-string '\\N' \ --input-null-string '\\N' \ --update-key id \ --update-mode updateonly \ --export-dir '${hdfscon1}_partition_year/year=2016' \ --columns="id,cname,cphone,ccity,couponid,couponname,accity,ctype,couponvalue,duration,pagename,expired_at,status,mincost,maxdiscount,minpay,created_at,updated_at,source,actionname,carprice,taxiprice,ticketprice,buspoolprice,date_id,year" \ -m 4 \ -direct ############################################################################################# # hive表按年分区 ############################################################################################# select max(to_date(updated_at)),min(to_date(updated_at)) from rpt_customer_coupon_details limit 10; select distinct(to_date(updated_at)) from rpt_customer_coupon_details; select distinct(substr(to_date(updated_at),6,2)) from rpt_customer_coupon_details; drop table if exists default.rpt_customer_coupon_details_partition;CREATE TABLE default.rpt_customer_coupon_details_partition_year (
`id` int, `cname` string, `cphone` string, `ccity` string, `couponid` int, `couponname` string, `accity` string, `ctype` int, `couponvalue` double , `duration` string , `pagename` string , `expired_at` string , `status` int , `minCost` double , `maxDiscount` double , `minPay` double , `created_at` string , `updated_at` string, `source` string , `actionname` string , `carprice` double, `taxiprice` double, `ticketprice` double, `buspoolprice` double , `date_id` string ) comment 'rpt_customer_coupon_details' partitioned by (year string) row format delimited fields terminated by '\t' stored as textfile;########设置动态分区
set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; SET hive.exec.max.dynamic.partitions.pernode=10000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=150000; set dfs.datanode.max.xcievers=8192;######向分区表插入数据
insert into default.rpt_customer_coupon_details_partition_year partition (year) select a.id, b.`name` cname, b.phone cphone, a.city ccity, a.coupon_id couponid, c.`name` couponname, ac.city accity, a.type AS ctype , a.`value` couponvalue, c.duration, g.`name` pagename, a.expire_at expired_at, a.status, c.minCost, c.maxDiscount, c.minPay, a.created_at, a.updated_at, a.source, ac.title actionname, nvl((case when r.status=1 then nvl(price,0) else 0 end),0) carprice, nvl((case when r.status=2 then nvl(price,0) else 0 end),0) taxiprice, nvl((case when r.status=3 then nvl(price,0) else 0 end),0) ticketprice, nvl((case when r.status=4 then nvl(price,0) else 0 end),0) buspoolprice, to_date(a.created_at) date_id, substr(to_date(a.created_at),1,4) year from (select x.id, x.customer_id, x.coupon_id, x.giftpack_id, x.action_id, x.source, x.type, x.source_type, x.source_id, x.status, x.created_at, x.updated_at, x.city, x.value, x.remark, x.expire_at from default.fct_customer_coupon x ) a LEFT JOIN default.dim_customer b on a.customer_id= b.id LEFT JOIN default.dim_coupon c on a.coupon_id=c.id LEFT JOIN default.dim_giftpack g on a.giftpack_id=g.id LEFT JOIN default.dim_action ac on a.action_id=ac.id LEFT JOIN ( SELECT p.coupon_id, p.price, p.status from default.fct_order_payment_deduction p ) r on a.id=r.coupon_id; #############导出 sqoop export \ --connect ${mysqlconnect} \ --username ${username} \ --password ${password} \ --table rpt_customer_coupon_details_partition_year \ --input-fields-terminated-by '\t' \ --update-key id \ --update-mode updateonly \ --export-dir '${hdfscon1}/year=2016' \ --columns="id,cname,cphone,ccity,couponid,couponname,accity,ctype,couponvalue,duration,pagename,expired_at,status,mincost,maxdiscount,minpay,created_at,updated_at,source,actionname,carprice,taxiprice,ticketprice,buspoolprice,date_id,year" \ -m 1sqoop export \
--connect ${mysqlconnect} \ --username ${username} \ --password ${password} \ --table rpt_customer_coupon_details_partition_year \ --hcatalog-database default \ --hcatalog-table rpt_customer_coupon_details_partition_year \ --num-mappers 1####################mysql建表
CREATE TABLE `rpt_customer_coupon_details_partition_year` ( `id` int(20) NOT NULL, `cname` varchar(255) DEFAULT NULL, `cphone` varchar(20) DEFAULT NULL , `ccity` varchar(50) DEFAULT NULL, `couponid` bigint(20) DEFAULT NULL, `couponname` varchar(255) DEFAULT NULL, `accity` varchar(50) DEFAULT NULL , `ctype` varchar(50) DEFAULT NULL, `couponvalue` double DEFAULT NULL, `duration` varchar(50) DEFAULT NULL, `pagename` varchar(255) DEFAULT NULL, `expired_at` varchar(50) DEFAULT NULL, `status` int(11) DEFAULT NULL, `minCost` double DEFAULT NULL , `maxDiscount` double DEFAULT NULL , `minPay` double DEFAULT NULL , `created_at` varchar(50) DEFAULT NULL , `updated_at` varchar(50) DEFAULT NULL , `source` varchar(50) DEFAULT NULL , `actionname` varchar(255) DEFAULT NULL , `carprice` double DEFAULT NULL , `taxiprice` double DEFAULT NULL , `ticketprice` double DEFAULT NULL , `buspoolprice` double DEFAULT NULL, `date_id` varchar(20) DEFAULT, `year` int(20) default null , PRIMARY KEY (`id`,`year`) ) partition by range(`year`)( partition p2016 values less than (2017), partition p2017 values less than (2018), partition p2018 values less than (2019) ); ############################################################################################# # hive表按季度分区 ############################################################################################# select max(to_date(updated_at)),min(to_date(updated_at)) from rpt_customer_coupon_details limit 10; select distinct(to_date(updated_at)) from rpt_customer_coupon_details; select distinct(substr(to_date(updated_at),6,2)) from rpt_customer_coupon_details; drop table if exists default.rpt_customer_coupon_details_partition_quarter;CREATE TABLE default.rpt_customer_coupon_details_partition_quarter (
`id` int, `cname` string, `cphone` string, `ccity` string, `couponid` int, `couponname` string, `accity` string, `ctype` int, `couponvalue` double , `duration` string , `pagename` string , `expired_at` string , `status` int , `minCost` double , `maxDiscount` double , `minPay` double , `created_at` string , `updated_at` string, `source` string , `actionname` string , `carprice` double, `taxiprice` double, `ticketprice` double, `buspoolprice` double , `date_id` string ) comment 'rpt_customer_coupon_details' partitioned by (quarter string) row format delimited fields terminated by '\t' stored as textfile;########设置动态分区
set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; SET hive.exec.max.dynamic.partitions.pernode=10000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=150000; set dfs.datanode.max.xcievers=8192;######向分区表插入数据
insert into default.rpt_customer_coupon_details_partition_quarter partition (quarter) select a.id, b.`name` cname, b.phone cphone, a.city ccity, a.coupon_id couponid, c.`name` couponname, ac.city accity, a.type AS ctype , a.`value` couponvalue, c.duration, g.`name` pagename, a.expire_at expired_at, a.status, c.minCost, c.maxDiscount, c.minPay, a.created_at, a.updated_at, a.source, ac.title actionname, nvl((case when r.status=1 then nvl(price,0) else 0 end),0) carprice, nvl((case when r.status=2 then nvl(price,0) else 0 end),0) taxiprice, nvl((case when r.status=3 then nvl(price,0) else 0 end),0) ticketprice, nvl((case when r.status=4 then nvl(price,0) else 0 end),0) buspoolprice, to_date(a.created_at) date_id, floor(substr(to_date(a.created_at),6,2)/3.1)+1 quarter from (select x.id, x.customer_id, x.coupon_id, x.giftpack_id, x.action_id, x.source, x.type, x.source_type, x.source_id, x.status, x.created_at, x.updated_at, x.city, x.value, x.remark, x.expire_at from default.fct_customer_coupon x ) a LEFT JOIN default.dim_customer b on a.customer_id= b.id LEFT JOIN default.dim_coupon c on a.coupon_id=c.id LEFT JOIN default.dim_giftpack g on a.giftpack_id=g.id LEFT JOIN default.dim_action ac on a.action_id=ac.id LEFT JOIN ( SELECT p.coupon_id, p.price, p.status from default.fct_order_payment_deduction p ) r on a.id=r.coupon_id; #############导出 sqoop export \ --connect ${mysqlconnect} \ --username ${username} \ --password ${password} \ --table rpt_customer_coupon_details_partition_quarter \ --fields-terminated-by "\t" \ --lines-terminated-by "\n" \ --update-key id \ --update-mode updateonly \ --export-dir '${hdfscon1}_partition_quarter/quarter=1' \ --columns="id,cname,cphone,ccity,couponid,couponname,accity,ctype,couponvalue,duration,pagename,expired_at,status,mincost,maxdiscount,minpay,created_at,updated_at,source,actionname,carprice,taxiprice,ticketprice,buspoolprice,date_id,quarter" \ -m 4 ###############mysql建表 CREATE TABLE `rpt_customer_coupon_details_partition_quarter` ( `ID` bigint(20) NOT NULL, `cname` varchar(255) DEFAULT NULL , `cphone` varchar(20) DEFAULT NULL , `ccity` varchar(50) DEFAULT NULL , `couponid` bigint(20) DEFAULT NULL, `couponname` varchar(255) DEFAULT, `accity` varchar(50) DEFAULT NULL , `ctype` varchar(50) DEFAULT NULL, `couponvalue` decimal(15,2) DEFAULT NULL, `duration` varchar(50) DEFAULT NULL , `pagename` varchar(255) DEFAULT NULL , `expired_at` varchar(50) DEFAULT NULL, `status` int(11) DEFAULT NULL, `minCost` decimal(15,2) DEFAULT , `maxDiscount` decimal(15,2) DEFAULT NULL , `minPay` decimal(15,2) DEFAULT NULL , `created_at` varchar(50) DEFAULT NULL , `updated_at` varchar(50) DEFAULT NULL , `source` varchar(50) DEFAULT NULL , `actionname` varchar(255) DEFAULT NULL, `carprice` decimal(15,2) DEFAULT NULL , `taxiprice` decimal(15,2) DEFAULT NULL, `ticketprice` decimal(15,2) DEFAULT NULL , `buspoolprice` decimal(15,2) DEFAULT NULL, `date_id` varchar(20) DEFAULT NULL , `quarter` varchar(10) DEFAULT NULL , PRIMARY KEY (`ID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;