离线处理~

离线处理

1.确保三台虚拟机都为开启状态,开启集群

2.测试hadoop1能否进入hive

3.将mysql中的数据离线采集到hive中

4创建库将表导入mysql,注意这里连接虚拟机里面的mysql,不是本地数据库

CREATE DATABASE test CHARSET utf8;

5.创建hive表,格式要和数据匹配上

create database data;
 use data;

这里使用脚本创建hive

touch hivecreate.sh
chmod +x hivecreate.sh
#!/bin/bash

beg_date=`date -d "${1}" +%s`
end_date=`date -d "${2}" +%s`

if((beg_date >${end_date}));then
 echo "beg_date < end_date"
 exit 0;
fi

currentDate=""
for((i=${beg_date};i<=${end_date};i=i+86400))
do
 currentDate=`date -d @${i} +%Y%m%d`
 echo "-----create /${currentDate}-----"
 hive -e "use data;create table order_status_log${currentDate}(id string,order_id string,order_status string,operate_time string) row format delimited fields
 terminated by ',';"
done

离线处理~

hivecreate.sh 20200901 20200903

6.创建成功接下来采集,先采集一个表的数据,后面我们脚本采集,文件位于这个目录下/root/datax/job

cp log.json logg.json
hdfs haadmin -getServiceState nn1
hdfs haadmin -getServiceState nn2
desc formatted hive表名
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "order_id",
                            "order_status",
                            "operate_time"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "order_status_log20200901"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://192.168.174.10:3306/test"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://192.168.174.11:8020",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/data.db/order_status_log20200901",
                        "fileName": "order_status_log20200901",
                        "column": [
                            {
                                "name": "id",
                                "type": "STRING"
                            },
                            {
                                "name": "order_id",
                                "type": "STRING"
                            },
                            {
                                "name": "order_status",
                                "type": "STRING"
                            },
                            {
                                "name": "operate_time",
                                "type": "STRING"
                        }
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": ",",
                        "compress":"GZIP"
                    }
                }
            }
        ]
    }
}

离线处理~

离线处理~

python /root/datax/bin/datax.py /root/datax/job/logg.json

采集完成

7.自动化采集

touch zdhive.sh
chmod +x zdhive.sh
#!/bin/bash

#第一步:获取json文件中的日期,date这个文本是我手动创建的,并且执行echo 20200901 > /root/sh/date手动添加的
date=`cat /root/sh/date`

#第二步:获取json文件日期的后一天日期
afterday_timestamp=$[`date -d "${date}" +%s`+86400]
afterday=`date -d @${afterday_timestamp} +%Y%m%d`

#这一步是全局替换,将0901替换成0902
sed -i "s/order_status_log${date}/order_status_log${afterday}/g" /root/datax/job/logg.json

#更新/root/sh/date文本中的日期,要和json文件中的日期保持一致
echo ${afterday} > /root/sh/date

#执行datax
python /root/datax/bin/datax.py /root/datax/job/logg.json

离线处理~

自动化采集完成

8.分区表

touch fenqu.sh
chmod +x fenqu.sh
create table order_status_log(id string,order_id string,order_status string,operate_time string)
partitioned by (day string) row format delimited fields terminated by ',';
touch result
#!/bin/bash

#此脚本中插入分区表的sql语句需要你自己补全才能够执行此脚本成功
#分区表中分区列的值为date(operate_time)的值

beg_date=`date -d "${1}" +%s`
end_date=`date -d "${2}" +%s`

if((beg_date >${end_date}));then
 echo "beg_date < end_date"
 exit 0;
fi

currentDate=""
for((i=${beg_date};i<=${end_date};i=i+86400))
do
  currentDate=`date -d @${i} +%Y%m%d`
  hive -e "
  use data;
  set hive.exec.dynamic.partition.mode=nostrict;
  set hive.exec.dynamic.partition=true;
  set hive.exec.max.dynamic.partitions=1000;
  insert into table data.order_status_log partition(day) select id,order_id,order_status,operate_time,date(operate_time) as day from data.order_status_log${currentDate};"
done

hive -S -e "select day,count(id) from data.order_status_log group by day;" > /root/sh/result

离线处理~

采集到分区表

8.docker

查看镜像
docker images
创建容器
docker run -itd --name=mysql-test -p 8888:3306 -e MYSQL_ROOT_PASSWORD=123456 1d7aba917169 /bin/bash
进入容器
docker exec -it mysql-test /bin/bash
启动mysql
 service mysql start
 进入mysql
 mysql -uroot -p
 需要给mysql一个远程连接的权限
mysql> GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT
    -> OPTION;
Query OK, 0 rows affected, 1 warning (0.00 sec)

mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.00 sec)

mysql> exit
Bye

离线处理~

9创建存储分区表的表

CREATE DATABASE test CHARSET utf8;

CREATE TABLE `month_count`(
`day` DATE,
`count` INT
)ENGINE=INNODB CHARSET=utf8;

脚本

[root@hadoop1 sh]# touch month_count.sh
[root@hadoop1 sh]# chmod +x month_count.sh 
#!/bin/bash
user="root"
password="123456"
host="192.168.174.9"
port=8888

mysql_conn="mysql -h"$host" -u"$user" -P"${port}"  -p"$password""

cat /root/sh/result | while read day count
do
  $mysql_conn -e "INSERT INTO test.month_count VALUES('$day','$count')"
done

离线处理~

完成

离线处理~
其他说明:

  • 所有的知识不保证权威性,如果各位朋友发现错误,非常欢迎与我讨论。
上一篇:coreJSON


下一篇:哈希的应用(2)——布隆过滤器