admin 管理员组

文章数量: 887016

Hive, Hue

文章目录

    • 一、Hive 概述
    • 二、Hive 数据类型和文件格式
      • 2.1 数据类型
        • 2.1.1 基本数据类型
        • 2.1.2 数据类型隐式转化
        • 2.1.3 集合数据类型
      • 2.2 文本文件的数据编码
        • 2.2.1 默认存储格式的默认分隔符
        • 2.2.2 读时模式
    • 三、DDL
      • 3.1 数据库操作
      • 3.2 建表语句
      • 3.3 内部表 与 外部表
      • 3.4 分区表
      • 3.5 分桶表
      • 3.6 修改表 与 删除表
    • 四、数据操作
      • 4.1 数据导入
      • 4.2 数据导出
    • 五、DQL
    • 六、函数
      • 6.1 Hive 内置函数
      • 6.2 窗口函数
      • 6.3 自定义函数
    • 七、元数据管理与存储
      • 7.1 元数据
      • 7.2 数据存储
    • 八、Hive 调优
      • 8.1 架构优化
      • 8.2 参数优化
      • 8.3 SQL 优化
      • 8.4 优化总结
      • 8.5 优化案例
    • 九、Hive 案例
      • 9.1 SQL
  • Hue 数据交互工具
    • 1. 安装、编译
    • 2. 整合 Hadoop, Hive

一、Hive 概述

二、Hive 数据类型和文件格式

2.1 数据类型

2.1.1 基本数据类型

类别类型
整型 IntegersTINYINT - 1 个字节的有符号整型
SAMLINT - 2 个字节有符号整型
INT - 4 个字节有符号整型
BIGINT - 8 个字节有符号整型
浮点数 Floating point numbersFLOAT - 单精度浮点数
DOUBLE - 双精度浮点数
定点数 Fixed point numbersDECIMAL - 17 字节,任意精度数字
字符串 StringSTRING - 不定长字符串
VARCHAR - (1 - 65535)长度的不定长字符串
CHAR - (1 - 255) 定长字符串
时间日期 DatetimeTIMESTAMP - 时间戳,纳秒级
DATE - 时间日期类型
布尔类型 BooleanBOOLEAN
二进制类型 Binary typesBINARY 二进制字节序列

Hive 中的基本数据类型都是通过 java 中的接口实现的,各类型与 java 数据类型的对应关系:

Hive 数据类型java 数据类型
TINYINTbyte
SAMLINTshort
INTint
BIGINTlong
FLOATfloat
DOUBLEdouble
DECIMAL
STRINGString
VARCHAR
CHAR
TIMESTAMP
DATE
BOOLEANboolean
BINARY

2.1.2 数据类型隐式转化

规律:

  • 底层逻辑:转换后数据精度不丢失

  • 任何 整数类型 都可以隐式转换为范围更广的类型

  • 所有 整数类型、float、string(数字), 都可以隐式转换为 double

  • 整型 都能隐式转换为 float

  • boolean 不能隐式转换

2.1.3 集合数据类型

  • array

  • map

  • struct

  • union

集合类型描述示例索引方式
array有序的,数据类型相同的,集合array(1, 2)[index]
mapkv 键值对,key 必须是基本数据类型,value 不限制map(‘a’, 1, ‘b’, 2)[key]
struct不同类型的, 集合, 类似 c 语言的结构体struct名.字段名
union不同类型的数据,但存储在同一字段的不同行中
hive (default)> select array(1, 2, 3);
OK
_c0
[1,2,3]-- 通过 [下标] 索引
hive (default)> select arr[1] from (select array(1, 2, 3) arr) tmp;
OK
_c0
2hive (default)> select map('a', 1, 'b', 2);
OK
_c0
{"a":1,"b":2}-- 通过 [key] 索引
hive (default)> select tmap['b'] from (select map('a', 1, 'b', 2) tmap) tmp;
OK
_c0
2-- struct 没有字段名时,默认字段名为 col_x
hive (default)> select struct('username1', 7, 1288.68);
OK
_c0
{"col1":"username1","col2":7,"col3":1288.68}-- name_struct() 可以指定字段名
hive (default)> select named_struct("name", "username1", "id", 7, "salary", 12880.68);
OK
_c0
{"name":"username1","id":7,"salary":12880.68}-- 通过 列名.字段名 的方式 索引
hive (default)> select userinfo.id, userinfo.name from (select named_struct("name", "username1", "id", 7, "salary", 12880.68) userinfo) tmp;
OK
id	name
7	username1

2.2 文本文件的数据编码

Hive 表中的数据都存储在 HDFS 上,其定义了默认的存储格式,也支持自定义存储格式

用户定义数据格式需要指定三个属性:

  • 列分隔符(通常为空格、"\t"、"\x001")

  • 行分隔符("\n")

  • 读取文件数据的方法。

2.2.1 默认存储格式的默认分隔符

分隔符名称说明
\n换行符分割行,每行一条记录
^A字段分割符分割字段
^B元素分隔符分割 array, map, struct 中的元素
^Ckv分隔符分割 map 中的 key 和 value

案例:

学生表

-- 字段和数据
id name age hobby(array) score(map)
666 thomas 18 read game java 90 hadoop 95-- 加上分隔符
666 ^A thomas ^A 18 ^A read ^B game ^A java ^C 90 ^B hadoop ^C 95-- 建表语句
create table s1 (id int,name string,age int,hobby array<string>,score map<string, int>
);-- 加载数据命令
load data local inpath '/home/hadoop/data/s1.data' into table s1;

2.2.2 读时模式

读取数据时才检查数据是否符合表的定义

Hive 采用 读时模式 schema on read , 加载数据时不进行数据格式的校验,读取数据时,如果不合法,则显示 Null

该模式的优点是写入、加载数据迅速

三、DDL

3.1 数据库操作

创建数据库

– 创建数据库,在HDFS上存储路径为 /user/hive/warehouse/*.db

-- 语法
CREATE DATABASE database_name IF NOT EXISTS database_name
COMMENT 'comments'
LOCATION 'hdfs_path'
;

查看数据库

-- 查看所有数据库
SHOW DATABASES;-- 查看数据库信息
DESC DATABASE database_name;
DESC DATABASE EXTENDED  database_name;

使用数据库

USE database_name

删除数据库

-- 删除空数据库
DROP DATABASE database_name;-- 删除不为空的数据库
DROP DATABASE database_name CASCADE;

3.2 建表语句

CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name    [(col_name data_type [column_constraint_specification] [COMMENT col_comment], ... [constraint_specification])][COMMENT table_comment][PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)][CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS][[ROW FORMAT row_format] [STORED AS file_format]][LOCATION hdfs_path][TBLPROPERTIES (property_name=property_value, ...)][AS select_statement];CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_nameLIKE existing_table_or_view_name[LOCATION hdfs_path];
  • EXTERNAL 关键字,创建外部表,否则创建内部表

    删除内部表时,数据和表同时被删除

    删除外部表时,只删除表

  • COMMENT 表注释

  • PARTITION BY 分区

  • CLUSTERED BY 分桶

  • SORTED BY 对桶表中的一个或多个字段排序

  • 存储语句

    ROW FORMAT DELIMITED
    [FIELDS TERMINATED BY char]
    [COLLECTION ITEMS TERMINATED BY char]
    [MAP KEYS TERMINATED BY char]
    [LINES TERMINATED BY char] | SERDE serde_name
    [WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, ...)]
    
  • SRORED AS SEQUENCEFILE(二进制序列文件,用于压缩) | TEXTFILE(默认) | RCFILE

  • LOCATION 表在 HDFS 上的存放位置

  • TBLPROPERTIES 表的属性

  • AS 表示根据其后面的查询语句的查询结果创建表

  • LIKE 用于复制现有的表结构,但是不复制数据

3.3 内部表 与 外部表

  • 默认情况下,创建内部表

  • EXTERNAL 关键字,用于创建外部表

  • 内部表可以转换为外部表

  • 删除内部表时,表和数据同时被删除

  • 删除外部表时,只删除表

  • 生产环境中,多使用外部表

案例:

数据

2;zhangsan;book,TV,code;beijing:chaoyang,shagnhai:pudong
3;lishi;book,code;nanjing:jiangning,taiwan:taibei
4;wangwu;music,book;heilongjiang:haerbin

内部表

-- 创建内部表
CREATE TABLE t1 (id INT,name STRING,hobby ARRAY<STRING>,addr MAP<STRING, STRING>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ";"
COLLECTION ITEMS TERMINATED BY ","
MAP KEYS TERMINATED BY ":"
;--  显示表的定义
DESC t1;-- 显示详细信息
DESC FORMATTED t1;-- 加载数据
LOAD DATA LOCAL INPATH '/home/hadoop/data/t1.dat' INTO TABLE t1;-- 查询数据文件
dfs -ls /user/hive/warehouse/mydb.db/t1;-- 删除表, 由于是内部表,表和数据同时被删除
DROP TABLE t1;

外部表

-- 创建内部表
CREATE EXTERNAL TABLE t2 (id INT,name STRING,hobby ARRAY<STRING>,addr MAP<STRING, STRING>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ";"
COLLECTION ITEMS TERMINATED BY ","
MAP KEYS TERMINATED BY ":"
;--  显示表的定义
DESC t2;-- 显示详细信息
DESC FORMATTED t2;-- 加载数据
LOAD DATA LOCAL INPATH '/home/hadoop/data/t1.dat' INTO TABLE t2;-- 查询数据文件
dfs -ls /user/hive/warehouse/mydb.db/t2;-- 删除表, 由于是外部表,表被删除, 数据还在
DROP TABLE t2;

内部表 与 外部表的转换

-- 内部表转换为外部表
ALTER TABLE t1 SET TBLPROPERTIES('EXTERNAL'='TRUE');-- 外部表转换为内部表
ALTER TABLE t1 SET TBLPROPERTIES('EXTERNAL'='FALSE');

3.4 分区表

Hive 查询时,会扫描整个表的数据,但是表的数据量大会导致全表扫描消耗资源大,效率低

Hive 中的分区表,将表的数据存储在不同的子目录中,每个目录对应一个分区,当查询只需要扫描部分数据时,可以使用分区表,提高查询效率

通常根据时间、地区等信息进行分区

创建分区表

CREATE TABLE IF NOT EXISTS t3 (id INT,name STRING,hobby ARRAY<STRING>,addr MAP<STRING, STRING>
)
PARTITIONED BY (dt STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ";"
COLLECTION ITEMS TERMINATED BY ","
MAP KEYS TERMINATED BY ":"
;-- 加载数据LOAD DATA LOCAL INPATH '/home/hadoop/data/t1.dat' INTO TABLE T3 PARTITION(dt='2020-06-01');LOAD DATA LOCAL INPATH '/home/hadoop/data/t1.dat' INTO TABLE T3 PARTITION(dt='2020-06-02');

分区字段不是表中已经存在的数据,可以将分区字段看成虚拟列

查看分区

SHOW PARTITIONS t3;

新增分区并设置数据

-- 增加分区,不加载数据
ALTER TABLE t3 ADD PARTITION(dt='2020-06-03');-- 增加多个分区
ALTER TABLE t3 ADD PARTITION(dt='2020-06-04') PARTITION(dt='2020-06-05');-- 增加分区,并加载数据
-- 准备数据
hdfs dfs -cp /user/hive/warehouse/mydb.db/t3/dt=2020-06-01 /user/hive/warehouse/mydb.db/t3/dt=2020-06-07
hdfs dfs -cp /user/hive/warehouse/mydb.db/t3/dt=2020-06-01 /user/hive/warehouse/mydb.db/t3/dt=2020-06-08
hdfs dfs -cp /user/hive/warehouse/mydb.db/t3/dt=2020-06-01 /user/hive/warehouse/mydb.db/t3/dt=2020-06-06
-- 增加分区,指定分区位置,完成数据加载
ALTER TABLE t3 ADD 
PARTITION(dt='2020-06-07') LOCATION '/user/hive/warehouse/mydb.db/t3/dt=2020-06-07'
PARTITION(dt='2020-06-08') LOCATION '/user/hive/warehouse/mydb.db/t3/dt=2020-06-08'
;

修改分区的 HDFS 路径

ALTER TABLE t3 PARTITION(dt='2020-06-04') SET LOCATION '/user/hive/warehouse/mydb.db/t3/dt=2020-06-08';

删除分区

-- 删除一个 或者 多个,用逗号隔开
ALTER TABLE t3 DROP 
PARTITION(dt='2020-06-03'),
PARTITION(dt='2020-06-05')
;

3.5 分桶表

当单个的分区或者表的数据量过大,分区不能更细粒度的划分数据,就需要使用分桶 技术将数据划分成更细的粒度。将数据按照指定的字段进行分成多个桶中去,即将数 据按照字段进行划分,数据按照字段划分到多个文件当中去

  • MR 中: key.hashCode % reductTask

  • Hive 中: 分桶字段.hashCode % 分桶个数

测试数据

java 90
1 c 78
1 python 91
1 hadoop 80
2 java 75
2 c 76
2 python 80
2 hadoop 93
3 java 98
3 c 74
3 python 89
3 hadoop 91
5 java 93
6 c 76
7 python 87
8 hadoop 88

创建分桶表

CREATE TABLE course (id INT,name STRING,score INT
)
CLUSTERED BY (id) INTO 3 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
;-- 创建普通表
CREATE TABLE course_common (id INT,name STRING,score INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
;-- 普通表加载数据
LOAD DATA LOCAL INPATH '/home/hadoop/data/course.dat' INTO TABLE course_common;-- 分桶表加载数据, 得通过普通表加载
INSERT INTO TABLE course SELECT * FROM course_common;

分桶规则

  • 分桶字段.hashcode % 分桶数

  • 分桶加载数据是通过普通表加载的,INSERT … SELECT …

3.6 修改表 与 删除表

表名重命名

ALTER TABLE course_common RENAME TO course_common1;

字段名重命名

ALTER TABLE course_common CHANGE COLUMN id cid INT;-- 也可以转换字段类型,但是注意类型转换原则
ALTER TABLE course_common CHANGE COLUMN id cid STRING;

增加字段

ALTER TABLE course_common ADD COLUMNS (common STRING);

删除字段

ALTER TABLE course_common REPLACE COLUMNS (id STRING, cname STRING, score INT)

删除表

DROP TABLE course_common;

四、数据操作

4.1 数据导入

语法:

LOAD DATA [LOCAL] INPATH 'filepath'
[OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
  • LOCAL 从本地文件上传到 HDFS 再添加进表中,否则从 HDFS 上移动到 Hive 指定的目录

  • OVERWRITE 覆盖原有数据,否则追加数据

  • PARTITION 分区

或者在创建表时,添加 LOCATION 参数,指定数据路径,完成创建表时导入数据

插入数据

-- 插入数据
INSERT INTO TABLE tabC
PARTITION(month='202001')
VALUES (5, 'wangwu', 'BJ'), (4, 'lishi', 'SH'), (3, 'zhangsan', 'TJ');-- 插入查询的结果数据
INSERT INTO TABLE tabC
PARTITION(month='202002')
SELECT id, name, area FROM tabC WHERE month='202001';-- 多表,多分区 插入
FROM tabC
INSERT TABLE tabC
PARTITION(month='202003')
SELECT id, name, area WHERE month='202002'
INSERT TABLE tabC
PARTITION(month='202004')
SELECT id, name, area WHERE month='202002'
;

创建表 并插入数据

-- 根据查询结果创建表
CREATE TABLE IF NOT EXISTS  tabD
AS SELECT * FROM tabC;

import 导入数据

IMPORT TABLE student2 PARTITION(month='202009')
FROM '/user/hive/warehouse/export/student';

4.2 数据导出

查询结果导出到本地

INSERT OVERWRITE LOCAL DIRECTORY '/home/hadoop/data/tabC'
SELECT * FROM tabC;

查询结果格式化后 导出到本地

INSERT OVERWRITE LOCAL DIRECTORY '/home/hadoop/data/tabC'
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ' '
SELECT * FROM tabC;

查询结果格式化后 导出到HDFS

去掉 LOCAL 关键字, 将输出路径改为 HDFS 文件路径

INSERT OVERWRITE DIRECTORY '/user/hadoop/data/course'
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ' '
SELECT * FROM course;

dfs 命令到处数据导本地,其实是 hdfs 文件拷贝到本地的 get 命令

dfs -get 'hdfs_path' 'local_path';

hive 命令,导出数据到本地, 查询结果重定向到文件

hive -e "SELECT * FROM course" > a.log-- 如果没有开启 Hive, 则需要指定数据库名
hive -e "SELECT * FROM mydb.course" > a.log

export 导出数据到 HDFS, 此方式还包含表的元数据信息

EXPORT TABLE tabC TO 'hdfs_path';

export 导出的数据,可以用 import 命令导入到 表中

-- LIKE 让 表的结构一致
CREATE TABLE tabE LIKE tabC;IMPORT TABLE tabE 'hdfs_path';

清空表,仅限内部表

```sql
TRUNCATE TABLE table_name;
```

五、DQL

语法:

[WITH CommonTableExpression (, CommonTableExpression)*]    (Note: Only available starting with Hive 0.13.0)
SELECT [ALL | DISTINCT] select_expr, select_expr, ...FROM table_reference[WHERE where_condition][GROUP BY col_list][ORDER BY col_list][CLUSTER BY col_list| [DISTRIBUTE BY col_list] [SORT BY col_list]][LIMIT [offset,] rows]

注意

  1. WHERE 子句中不能使用列的别名

  2. WHERE 子句紧随 FROM 子句

  3. 正则表达式用 RLIKE 或者 REGEXP

  4. WHERE 子句不能有分组函数, HAVING 子句可以有分组函数

  5. HAVING 只能用于 GROUP BY 分组统计后

    WHERE 子句针对表中的数据发挥作用

    HAVING 针对查询结果(聚组以后的结果) 发挥作用

  6. 笛卡尔积

    在以下条件会产生笛卡尔积

    • 没有连接条件

    • 连接条件无效

    • 所有表中的所有行互相连接

  7. ORDER BY - 全局排序,对最终的结果进行排序,所以出现在 SELECT 语句的结尾

    只有一个 reduce

    排序字段要出现在select子句中

  8. SORT BY - 每个 MR 内部排序,

    每一个 reduce 内部进行排序, 得到局部有序的结果

  9. DISTRIBUTE BY - 分区, 根据字段,将特定的行发送到特定的 reduce 中,需要配合 SORT BY 实现分区排序

  10. CLUSTER BY - 简化分区排序语法,当 DISTRIBUTE BY 与 SORT BY 是用一个字段时使用

    只能升序, 不能指定排序规则

    SELECT * FROM emp DISTRIBUTE BY deptno SORT BY deptno;-- 等价于
    SELECT * FROM emp CLUSTER BY deptno;
    

六、函数

6.1 Hive 内置函数

查看系统函数

-- 查看系统所有函数
SHOW FUNCTIONS;-- 显示函数的用法
DESC FUNCTION function_name;-- 显示详细信息
DESC FUNCTION EXTENDED function_name;

日期函数

-- 当前日期
SELECT CURRENT_DATE;
-- 当前日期的时间戳
SELECT UNIX_TIMESTAMP();
-- 当前日期和时间, 不用括号也行
SELECT CURRENT_TIMESTAMP();-- 时间戳 -> 日期
SELECT FROM_UNIXTIME(1632881444);
SELECT FROM_UNIXTIME(1632881444, 'yyyyMMdd');
SELECT FROM_UNIXTIME(1632881444, 'yyyyMMdd HH:mm:ss');-- 日期 -> 时间戳, 注意日期格式和分隔符
SELECT UNIX_TIMESTAMP('2021-09-29 12:20:05');-- 计算时间差, 离现在更近的日期在前
SELECT DATEDIFF('2020-04-18', '2020-05-17');
SELECT DATEDIFF('2020-05-17', '2020-04-18');-- 查询当月第几天
SELECT DAYOFMONTH(CURRENT_DATE);-- 当月第一天
-- DATE_SUB(start_date, num_days) - Returns the date that is num_days before start_date.
SELECT DATE_SUB(CURRENT_DATE, DAYOFMONTH(CURRENT_DATE)-1);
-- date_sub()
SELECT DATE_SUB('2020-03-01', 2);-- 下个月第一天 (在当月第一天的基础上前推一个月)
-- ADD_MONTHS(start_date, num_months) - Returns the date that is num_months after start_date.
SELECT ADD_MONTHS(DATE_SUB(CURRENT_DATE, DAYOFMONTH(CURRENT_DATE)-1), 1);-- 字符串(yyyy-MM-dd 格式) -> 时间
SELECT TO_DATE('2020-01-01');
SELECT TO_DATE('2020-01-01 12:20:05');-- 标准时间格式 输出
SELECT DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss');
SELECT DATE_FORMAT(CURRENT_DATE(), 'yyyy-MM-dd HH:mm:ss');
SELECT DATE_FORMAT('2020-06-20', 'yyyy-MM-dd HH:mm:ss');-- 计算 emp 表中,每个人的工龄
-- (当前的日期 - hiredate) / 365
SELECT *, ROUND(DATEDIFF(CURRENT_DATE, hiredate) / 365, 1) working_years FROM emp;

字符串函数

-- 转换为 小写
SELECT LOWER("HELLO WORLD");-- 大写
SELECT UPPER(LOWER("HELLO WORLD"));-- 字符串长度
SELECT LENGTH(ename), ename FROM emp;-- 字符串拼接
SELECT CONCAT(empno, " ", ename) idname from emp;-- 指定分隔符,CONCAT_WS()
-- CONCAT_WS(separator, [string | array(string)]+) - returns the concatenation of the strings separated by the separator.
SELECT CONCAT_WS('.', 'www', array('lagou', 'com'));
SELECT CONCAT_WS(" ", ename, job) FROM emp;-- 子串, 5 表示从 5 开始, 下标从 1 开始
SELECT SUBSTR('www.lagou.com', 5);
SELECT SUBSTR('www.lagou.com', -5);
SELECT SUBSTR('www.lagou.com', 5, 5);-- 字符串切分, ‘\\.’ 是 正则表达式,注意符号转义
SELECT SPLIT('www.lagou.com', '\\.');

数学函数

-- 四舍五入
SELECT ROUND(3.1415926);
SELECT ROUND(3.1415926, 2);
SELECT ROUND(3.1415926, -2);	-- 往小数点前取几位-- 向上取整
SELECT CEIL(3.1415926);-- 向下取整
SELECT FLOOR(3.1415926);-- 绝对值, 幂运算,平方,开方,对数
SELECT ABS(-10.5) abs, POWER(-10.5, 2) power, SQRT(-10.5) sqrt,	-- 不合法的计算会返回 NULLLOG2(-10.5) log2,LOG10(-10.5) log10
;SELECT ABS(-10.5) abs, POWER(-10.5, 2) power, SQRT(100) sqrt,LOG2(100) log2,LOG10(100) log10
;

条件函数

if, case when, coalesce, isnull/isnotnull, nvl, nullif

-- 将员工工资按照等级分类,0-1500:1, 1500-3000:2, 3000 以上: 3-- if
SELECT sal, IF (sal<=1500, 1, if (sal<3000, 2, 3)) FROM emp;-- case when, 注意语法格式
SELECT sal, CASE WHEN sal<=1500 THEN 1WHEN sal<=3000 THEN 2ELSE 3 END sal_level
FROMemp
;
-- 如果是判断某个字段等于什么值,可以简化语法
SELECT ename, deptno,CASE deptno WHEN 10 THEN 'accounting'WHEN 20 THEN 'research'WHEN 30 THEN 'salse'ELSE 'unknown' END dept_name
FROMemp
;-- 返回第一个非空的参数, 可以用于缺省值
-- COALESCE(a1, a2, ...) - Returns the first non-null argument
-- 如果 comm 为 NULL,则返回 0
SELECT sal, COALESCE(comm, 0) FROM emp;-- ISNULL / ISNOTNULL
SELECT * FROM emp WHERE ISNULL(comm);
SELECT * FROM emp WHERE ISNOTNULL(comm);-- NVL, 缺省值
-- NVL(value,default_value) - Returns default value if value is null else returns value
SELECT sal, NVL(comm, 0) FROM emp;-- NULLIF(x, y), 相等为空,否则为 x
SELECT NULLIF(1, 2), NULLIF(2, 1), NULLIF(1, 1);

UDTD 函数, User Defined Table-Generating Functions

用户自定义 表生成函数

一行输入,多行输出

-- explode 炸裂函数
-- 将一行中,复杂的数据结构,map,array 等,拆分成多行
SELECT EXPLODE(ARRAY(1, 2, 3, 4, 5));
SELECT EXPLODE(MAP('a', 1, 'b', 2, 'c', 3));-- lateral view 侧视图?
-- 语法
LATERAL VIEW udtf(expression) tableAlias AS columnAlias (',' columnAlias)*
fromClause: FROM baseTable (lateralView)*-- 用法
WITH t1 AS (SELECT 'OK' cola, SPLIT('www.lagou.com', '\\.') colb
)
SELECT cola, colc
FROM t1
LATERAL VIEW EXPLODE(colb) t2 AS colc
;

UDTF 案例

-- 1. 案例 1-- 数据
id	tags
1	1,2,3
2	2,3
3	1,2-- 结果
id	e_tags
1	1
1	2
1	3
2	2
2	3
3	1
3	2-- sql
WITH t1 AS (
SELECT id,SPLIT(tags, '\\,') s_tags
FROMtab1
)
SELECT id, e_tags
FROM t1
LATERAL VIEW EXPLODE(s_tags) t2 AS e_tags
;-- 案例2-- 题目: 找到每个学院的最好成绩-- 数据
-- name | score_map
lisi|Chinese:90,Math:80,English:70 
wangwu|Chinese:88,Math:90,English:96 
maliu|Chinese:99,Math:65,English:60-- sql
-- 1. 炸裂 score_map
SELECT EXPLODE(score) AS (subject, mark) FROM stu_score;-- 2. 和 name 关联
SELECT name, subject, mark
FROM stu_score
LATERAL VIEW EXPLODE(score) t1 AS subject, mark
;-- 3. 输出每个学员的最好成绩
WITH tmp AS 
(SELECT name, subject, mark
FROM stu_score
LATERAL VIEW EXPLODE(score) t1 AS subject, mark
)
SELECT name, MAX(mark) max_score
FROM tmp
GROUP BY name;-- 以下 sql 无法运行,需要修改
SELECT name, mark
FROM
(SELECT name, subject, mark
FROM stu_score
LATERAL VIEW EXPLODE(score) t1 AS subject, mark
) t1
GROUP BY name
ORDER BY mark DESC
;

6.2 窗口函数

分析函数的一种

用于计算基于组的某种聚合值,但是聚合函数只能对每个组返回一行

窗口函数对于每个组可以返回多行

窗口函数指定了分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变化而变化。

OVER 关键字

OVER() 没有参数的话,默认是全部结果集

-- 查询 emp 表中,各个员工的工资占所有员工工资的百分比
-- 1. 汇总 总工资 作为新字段
-- 错误语法,SELECT ename, sal, sum(sal) FROM emp;
-- 应该添加 OVER 关键字
SELECT ename, sal, SUM(sal) OVER() AS total_sal FROM emp;-- 2. 员工工资 / 总工资 取百分比,用 CONCAT 拼接字符
SELECT ename, sal, SUM(sal) OVER() AS total_sal,CONCAT(ROUND(sal / SUM(sal) OVER()*100, 1), "%") ratio_sal
FROM emp;

PARTITION BY 子句

在 OVER 窗口中进行分区,对某一列进行分区统计,窗口大小是分区大小

-- 查询 员工姓名,薪水,以及各部门薪水总和
SELECT ename, sal, deptno, SUM(sal) OVER(PARTITION BY deptno) sal_sum_dep
FROM emp;

ORDER BY 子句

-- 增加排序, 此时的 SUM 是 从分组的第一行到当前行求和
SELECT ename, sal, deptno, SUM(sal) OVER(PARTITION BY deptno ORDER BY sal) sal_sum_dep
FROM emp;-- 输出
ename	sal	deptno	sal_sum_dep
MILLER	1300	1	1300CLARK	2450	10	2450
KING	5000	10	7450SMITH	800		20	800
ADAMS	1100	20	1900
JONES	2975	20	4875
SCOTT	3000	20	10875
FORD	3000	20	10875JAMES	950		30	950
MARTIN	1250	30	3450
WARD	1250	30	3450
TURNER	1500	30	4950
ALLEN	1600	30	6550
BLAKE	2850	30	9400

window 子句

ROWS BETWEENT ... AND ...

对窗口的结果做更细粒度的划分

  • UNDOUNDED PRECEDING 组内第一行数据

  • n PRECEDING 组内当前行的 前 n 行数据

  • CURRENT ROW 当前行数据

  • n FOLLOWING 组内当前行的 后 n 行数据

  • UNBOUNDED FOLLOWING 组内最后一行数据

-- 计算 分组内,第一行数据到当前行的累计和
SELECT ename, sal, deptno,SUM(sal) OVER(PARTITION BY deptno ORDER BY ename ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) sum_sal
FROM emp
;-- 计算 分组内,所有数据,即第一行数据到最后一行的总和
SELECT ename, sal, deptno,SUM(sal) OVER(PARTITION BY deptno ORDER BY ename ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) sum_sal
FROM emp
;-- 计算 分组内,当前行以及前一行和后一行的和
SELECT ename, sal, deptno,SUM(sal) OVER(PARTITION BY deptno ORDER BY ename ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING ) sum_sal
FROM emp
;

排名函数

  • row_number, 行号连续,无并列排名

  • rank, 行号不连续,有并列排名

  • dense_rank, 行好连续,有并列排名

-- 三个排名函数的区别-- 各部门员工的工资排名
SELECTename, sal, deptno,ROW_NUMBER() OVER(PARTITION BY deptno ORDER BY sal DESC) row_number,RANK() OVER(PARTITION BY deptno ORDER BY sal DESC) rank,DENSE_RANK() OVER(PARTITION BY deptno ORDER BY sal DESC) dense_rank 
FROMemp
;
-- 输出
ename	sal	deptno	row_number	rank	dense_rank
MILLER	1300	1		1		1		1
KING	5000	10		1		1		1
CLARK	2450	10	 	2		2		2
SCOTT	3000	20		1		1		1
FORD	3000	20		2		1		1
JONES	2975	20		3		3		2
ADAMS	1100	20		4		4		3
SMITH	800		20		5		5		4
BLAKE	2850	30		1		1		1
ALLEN	1600	30		2		2		2
TURNER	1500	30		3		3		3
MARTIN	1250	30		4		4		4
WARD	1250	30		5		4		4
JAMES	950		30		6		6		5-- 每个部门的工资前 3 名
SELECTename, deptno, sal, sal_rank
FROM(SELECTename, deptno, sal,DENSE_RANK() OVER(PARTITION BY deptno ORDER BY sal DESC) sal_rankFROMemp) tmp
WHERE sal_rank <= 3
;

序列函数

  • lag, 返回当前行的前一行数据

  • lead, 返回当前行的下一行数据

  • first_value, 返回分组排序后,分组内的第一个值

  • last_value, 返回分组排序后,分组内的最后一个值

  • ntile, 将分组的数据按照顺序切分成 n 片,返回当前的切片序号/值

-- 序列函数的区别-- lag
SELECTuid, log_date,LAG(log_date) OVER (PARTITION BY uid) lag
FROMlogging
;-- lead
SELECTuid, log_date,LEAD(log_date) OVER (PARTITION BY uid ORDER BY log_date) lag
FROMlogging
;-- first_value
SELECTuid, log_date,FIRST_VALUE(log_date) OVER (PARTITION BY uid ORDER BY log_date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS first_logdate
FROMlogging
;-- last_value
SELECTuid, log_date,LAST_VALUE(log_date) OVER (PARTITION BY uid ORDER BY log_date ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS first_logdate
FROMlogging
;-- ntile
-- 注意 ntile(n) 的参数
SELECTuid, log_date,NTILE(3) OVER (PARTITION BY uid ORDER BY log_date) log_ntiles
FROMlogging
;

SQL 题目

数据

-- 数据。uid dt status(1 正常登录,0 异常)
ab141	2021-03-09	1

SQL

-- 连续登录 7 天的用户 (扩展题目: 最长连续不活跃天数的用户分布)
-- 1. 剔除没有登录(status = 0)的日期
SELECT * FROM logging WHERE status = 1;-- 2. 将每个用户登录日期排名 row_number
SELECT *, ROW_NUMBER() OVER (PARTITION BY uid) rank
FROMlogging
WHERE status = 1
;-- 3. 每个用户的登录日期 - 排名名次,如果相同,则为同一天
SELECTuid,count(log_tag) log_cnt
FROM(SELECT *, DATE_SUB(log_date, ROW_NUMBER() OVER (PARTITION BY uid ORDER BY log_date)) log_tagFROMloggingWHERE status = 1) tmp
GROUP BY uid, log_tag	-- group by 有去重
HAVING log_cnt >= 3	-- 如果要查看各个连续天数的用户分布,可以不使用 hvaing 对分组聚合的结果进行筛选
;SELECTuid, log_tag,count(log_tag) log_cnt
FROM(SELECT *, DATE_SUB(dt, ROW_NUMBER() OVER (PARTITION BY uid ORDER BY dt)) log_tagFROMulogin	WHERE status = 1) tmp
GROUP BY uid, log_tag	-- group by 有去重
HAVING log_cnt >= 3	-- 如果要查看各个连续天数的用户分布,可以不使用 hvaing 对分组聚合的结果进行筛选
;
select uid, count(log_tag) log_cnt 
from (select *, date_sub(dt, row_number() over (partition by uid order by dt)) log_tag from (select * from (select *, row_number() over (partition by uid, dt) dt_sort from ulogin where status = 1) tmp where dt_sort = 1) t2
) t3
group by uid, log_tag
;
-- 求每个班级前三名的排名 以及 分数差,分数一样的并列-- 数据
-- sid class score
1 1901 90
2 1901 90
3 1901 83
4 1901 60
5 1902 66
6 1902 23
7 1902 99
8 1902 67
9 1902 87-- 结果
class score rank lagscore 
1901 	90 	1 	0
1901 	90 	1 	0
1901 	83 	2 	-7
1901 	60 	3 	-23
1902 	99 	1 	0
1902 	87 	2 	-12
1902 	67 	3 	-20-- 1. 排名
SELECT*, DENSE_RANK() OVER (PARTITION BY class ORDER BY score DESC) score_sort
FROMst
;-- 2. lag,相减
WITH tmp AS (
SELECT*, DENSE_RANK() OVER (PARTITION BY class ORDER BY score DESC) score_sort
FROMstu
)
SELECT class, score, score_sort,NVL(score - LAG(score) OVER (PARTITION BY class ORDER BY score DESC), 0) score_diff
FROMtmp
WHERE score_sort <= 3
;
-- 列转行-- 数据
-- id, course
1 java
1 hadoop
1 hive
1 hbase
2 java
2 hive
2 spark
2 flink
3 java
3 hadoop
3 hive
3 kafka-- 结果, 1 - 选修,0 - 未选
id java hadoop hive hbase spark flink kafka
1 	1 	1 	1 	0 	0 	0
1 	0 	1	0	1	1 	0
1 	1 	1 	0 	0 	0 	1-- 1. case when
-- 2. group by + sum() 因为 case when 每执行一次,都会有输出
SELECTid,SUM(CASE WHEN course='java' THEN 1 ELSE 0 END) AS java,SUM(CASE WHEN course='hadoop' THEN 1 ELSE 0 END) AS hadoop,SUM(CASE WHEN course='hive' THEN 1 ELSE 0 END) AS hive,SUM(CASE WHEN course='hbase' THEN 1 ELSE 0 END) AS hbase,SUM(CASE WHEN course='spark' THEN 1 ELSE 0 END) AS spark,SUM(CASE WHEN course='flink' THEN 1 ELSE 0 END) AS flink,SUM(CASE WHEN course='kafka' THEN 1 ELSE 0 END) AS kafka
FROMrowline1
GROUP BY id
;	
-- 行转列-- 数据
-- id1, id2, flag
a b 2
a b 1
a b 3
c d 6
c d 8
c d 8-- 结果
id1 id2 flag
a b 2|1|3
c d 6|8-- SQL
-- 1. 数据分组聚合,
-- collect_set(x) - Returns a set of objects with duplicate elements eliminated, 去重后的
-- collect_list(x) - Returns a list of objects with duplicates, 没有去重的
SELECTid1, id2, COLLECT_SET(flag) flag,COLLECT_LIST(flag) flag1
FROM rowline2
GROUP BY id1, id2
;-- 2. 将分组内的数据连接成列
SELECTid1, id2, CONCAT_WS('|', COLLECT_SET(CAST (flag AS STRING))) flag_str
FROMrowline2
GROUP BY id1, id2
;-- 列转行-- 1. 拆分列
SELECTid1, id2, SPLIT(flag_str, '\\|') flag_split
FROM	rowline3
;-- 2. 将拆分好的列中的数据,与其对应的字段关联
SELECTid1, id2, flag
FROMrowline3 LATERAL VIEW EXPLODE(SPLIT(flag_str, '\\|')) tmp AS flag
;

6.3 自定义函数

三类:

  • UDF(User Defined Function), 一进一出, 如 power()
  • UDAF(User Defined Aggregation Function), 聚集函数,多进一出, 如 sum(), count()
  • UDTF(User Defined Table-Generating Functions), 表生成函数,如 explode()

UDF 开发要点:

  • 继承 org.apache.hadoop.hive.ql.exec.UDF
  • 需要实现 evaluate 函数
  • UDF 必须有返回类型,可以返回 null,但是不能为 void

UDF 开放步骤:

  • 创建maven java 工程,添加依赖
  • 开发java类继承UDF,实现evaluate 方法
  • 将项目打包上传服务器
  • 添加开发的jar包
  • 设置函数与自定义函数关联
  • 使用自定义函数

案例:

扩展 NVL 函数的功能

-- nvl(value,default_value) - Returns default value if value is null else returns value-- 扩展函数的功能为
-- nvl(value,default_value) - 如果 value 是 null 或者 空字符串 或者 若干长度的空格符字符串 则返回 default_value
  1. 创建 maven java 工程,添加依赖

  2. 开发 java 类继承 UDF,实现 evaluate 方法

  3. 将项目打包上传服务器

  4. 添加开发的 jar 包

    ADD JAR /home/hadoop/hiveudf.jar;
    
  5. 设置函数与自定义函数关联

    创建临时函数

    CREATE TEMPORARY FUNCTION mynvl AS "com.lagou.hive.udf.nvl";
    

    创建永久函数

    • jar 包 上传到 HDFS

      hdfs dfs -put hiveudf.jar /user/hadoop/jar/
      
    • hive 中创建永久函数

      CREATE FUNCTION mynvl1 AS 'com.lagou.hive.udf.nvl' USING jar 'hdfs:/user/hadoop/jar/hiveudf.jar';
      
  6. 使用自定义函数

七、元数据管理与存储

7.1 元数据

在 Hive 中需要描述清楚表跟文件之间的映 射关系、列和字段之间的关系等等信息

Metadata 即元数据。元数据包含用 Hive 创建的 database、table、表的字段等元信 息。元数据存储在关系型数据库中。如 Hive 内置的 Derby、第三方如 MySQL 等

Metastore 即元数据服务,是 Hive 用来管理库表元数据的一个服务。有了它上层的 服务不用再跟裸的文件数据打交道,而是可以基于结构化的库表信息构建计算框架。

通过 metastore 服务将 Hive 的元数据暴露出去,而不是需要通过对 Hive 元数据库 mysql的访问才能拿到Hive的元数据信息; metastore 服务实际上就是一种 thrift 服 务,通过它用户可以获取到 Hive 元数据,并且通过 thrift 获取元数据的方式,屏蔽了数据库访问需要驱动,url,用户名,密码等细节

Metastore 三种配置方式

  • 内嵌模式

  • 本地模式

  • 远程模式, metastore 服务和 hive 运行在不同的集群节点上, 满足高可用

HiveServer 2

HiveServer 2 是一个服务端接口,使远程客户端可以执行对 Hive 的查询并返回结果。

基于 Thrift RPC 的实现是 HiveServer 的改进版本,并支持多客户端并发和身份验 证,启动 HiveServer 2 服务后,就可以使用 jdbc、odbc、thrift 的方式连接

HiveServer 2 的作用:

  • 为 Hive 提供了一种允许客户端远程访问的服务
  • 基于 thrift 协议,支持跨平台,跨编程语言对 Hive 访问
  • 允许远程访问 Hive

HCatalog

Hive 目录 服务

HCatalog 提供了一个统一的元数据服务,允许不同的工具如 Pig、MapReduce 等通 过 HCatalog 直接访问存储在 HDFS 上的底层文件

HCatalog 提供了一个称为 hcat 的命令行工具。这个工具和 Hive 的命令行工具类 似,两者最大的不同就是 hcat 只接受不会产生 MapReduce 任务的命令。

7.2 数据存储

八、Hive 调优

影响 Hive 效率的不仅仅是数据量过大; 数据倾斜、数据冗余、job 或 I/O 过多、 MapReduce 分配不合理等因素都对 Hive 的效率有影响。

对 Hive 的调优既包含对 HiveQL 语句本身的优化,也包含 Hive 配置项 和 MR 方面的调 整

8.1 架构优化

  • 执行引擎
  • 优化器
  • 分区、分桶
  • 文件格式
  • 数据压缩

执行引擎

Hive 支持多种执行引擎,分别是 MapReduce、Tez、Spark、Flink。可以通过 hive-site.xml 文件中的 hive.execution.engine 属性控制

优化器

与关系型数据库类似,Hive 会在真正执行计算之前,生成和优化逻辑执行计划与物理 执行计划。Hive 有两种优化器: Vectorize (向量化优化器) 与 Cost-Based Optimization (CBO 成本优化器)

  • 矢量化查询执行

    矢量化查询(要求执行引擎为 Tez )执行通过一次批量执行 1024 行而不是每行一行来提 高扫描,聚合,过滤器和连接等操作的性能,这个功能一显着缩短查询执行时间

    set hive.vectorized.execution.enabled = true;
    -- 默认 false
    set hive.vectorized.execution.reduce.enabled = true; 
    -- 默认 false
    

    矢量化查询,必须用 ORC 格式存储数据

  • 成本优化器

    Hive 的 CBO 是基于 apache Calcite 的,Hive 的 CBO 通过查询成本(有analyze 收集的统 计信息)会生成有效率的执行计划,最终会减少执行的时间和资源的利用,使用 CBO 的配置如下 :

    SET hive.cbo.enable=true;
    SET hive.compute.query.using.stats=true;
    SET hive.stats.fetch.column.stats=true;
    SET hive.stats.fetch.partition.stats=true;
    

    定期执行表的分析,分析后的数据放在元数据库中

分区表

对于一张比较大的表,将其设计成分区表可以提升查询的性能,对于一个特定分区的 查询,只会加载对应分区路径的文件数据,所以执行速度会比较快

分区字段的选择是影响查询性能的重要因素,尽量避免层级较深的分区,这样会造成 太多的子文件夹

常见的分区字段:

  • 日期或时间。如year、month、day或者hour,当表中存在时间或者日期字段时
  • 地理位置。如国家、省份、城市等
  • 业务逻辑。如部门、销售区域、客户等等

分桶表

与分区表类似,分桶表的组织方式是将HDFS上的文件分割成多个文件。

分桶可以加快数据采样,也可以提升 join 的性能(join 的字段是分桶字段),因为分桶可以确保某个 key 对应的数据在一个特定的桶内(文件),巧妙地选择分桶字段可以大幅 度提升 join 的性能。

文件格式

在 HiveQL 的 create table 语句中,可以使用 stored as … 指定表的存储格式。

Hive 表支持的存储格式有 TextFile、SequenceFile、RCFile、ORC、Parquet 等。

生产环境中绝大多数表都采用 TextFile、 ORC、Parquet 存储格式之一。

TextFile 是最简单的存储格式,它是纯文本记录,也是 Hive 的默认格式。其磁盘开销大,查询效率低,更多的是作为跳板来使用。RCFile、ORC、Parquet 等格式的表都不能由文件直接导入数据,必须由 TextFile 来做中转。

Parquet 和 ORC 都是 Apache 旗下的开源列式存储格式。列式存储比起传统的行式存储更适合批量 OLAP 查询,并且也支持更好的压缩和编码。选择 Parquet 的原因主要是它支持 Impala 查询引擎,并且对 update、delete 和事务性操作需求很低。

数据压缩

压缩技术可以减少 map 与 reduce 之间的数据传输,从而可以提升查询性能,关于压 缩的配置可以在 hive 的命令行中或者 hive-site.xml 文件中进行配置。

压缩的编码器可以配置在 mapred-site.xml, hive-site.xml 中:

-- 中间结果压缩
SET hive.intermediate.compression.codec=org.apache.hadoop.io.compre ss.SnappyCodec ;
-- 输出结果压缩
SET hive.exec.compress.output=true;
SET mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodc

8.2 参数优化

  • 本地模式
  • 严格模式
  • JVM 重用
  • 并行执行
  • 推测还行
  • 合并小文件
  • Fetch模式

本地模式

当 Hive 处理的数据量小时,启动分布式去处理数据会浪费资源,可以启用本地模式

SET hive.exec.mode.local.auto=true; -- 默认 false
SET hive.exec.mode.local.auto.inputbytes.max=50000000; SET hive.exec.mode.local.auto.input.files.max=5; -- 默认 4

严格模式

强制不允许用户执行 3 种有风险的 HiveQL 语句

  • 查询分区表时,不限定分区列的语句
  • 两表 join 产生笛卡尔积的语句
  • 用 order by 排序,但是没有指定 limit 语句

开启严格模式的参数设置

-- 要开启严格模式,需要将参数 hive.mapred.mode 设为strict(缺省值)。 
--该参数可以不在参数文件中定义,在执行SQL之前设置(sethive.mapred.mode=nostrict )
SET sethive.mapred.mode=nostrict;

JVM 重用

默认情况下,Hadoop 会为一个 map 或者 reduce 启动一个 JVM。这种情况下,会并行执行 map 和 reduce

当遇到 map 或者 reduce 是几秒钟的轻量级的 job 时,JVM 启动进程所耗费的时间会比执行的时间要长,通过重用 JVM,共享 JVM 以串行的方式运行 map 或者 reduce

开启 JVM 重用,需要配置最大的 task 数量,其默认值为 1, 如果设置为 -1 则表示不限制 重用 JVM 的 task 的数量

-- 代表同一个MR job中顺序执行的5个task重复使用一个JVM,减少启动和关闭的开销
SET mapreduce.job.jvm.numtasks=5;

并行执行

Hive 的查询通常会被转换成一系列的 stage,这些 stage 之间并不是一直相互依赖的, 可以并行执行这些 stage

配置:

SET hive.exec.parallel=true; -- 默认false
SET hive.exec.parallel.thread.number=16; -- 默认8

并行执行可以增加集群资源的利用率,如果集群的资源使用率已经很高了,那么并行执行的效果不会很明显。

推测执行

在分布式集群环境下,因为程序 Bug、负载不均衡、资源分布不均等原因,会造成同 一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任 务(比如一个作业的某个任务进度只有 50%,而其他所有任务已经运行完毕),则这 些任务会拖慢作业的整体执行进度。

为了避免这种情况发生,Hadoop 采用了推测执行机制,它根据一定的规则推测出 “拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理 同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。

配置:

set mapreduce.map.speculative=true
set mapreduce.reduce.speculative=true
set hive.mapred.reduce.tasks.speculative.execution=true

合并小文件

  • 在 map 执行前 合并小文件,减少 map 数量
-- 缺省参数
set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInput Format;
  • 在 MR 任务结束时 合并小文件
-- 在 map-only 任务结束时合并小文件,默认true
SET hive.merge.mapfiles = true;-- 在 map-reduce 任务结束时合并小文件,默认false
SET hive.merge.mapredfiles = true;-- 合并文件的大小,默认256M
SET hive.merge.size.per.task = 268435456;-- 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件 merge
SET hive.merge.smallfiles.avgsize = 16777216;

Fetch 模式

Fetch 模式是指 Hive 中对某些情况的查询可以不必使用 MapReduce 计算。select
col1, col2 from tab;
可以简单地读取表对应的存储目录下的文件,然后输出查询结果到控制台。在开启 fetch 模式之后,在全局查找、字段查找、limit 查找等都不启动 MapReduce。

-- Default Value: minimal in Hive 0.10.0 through 0.13.1, more in Hive 0.14.0 and later
hive.fetch.task.conversion = more

8.3 SQL 优化

列裁剪 和 分区裁剪

  • 列裁剪,查询时,只读取需要的列
  • 分区裁剪,查询时,只读取需要的分区

sort by 代替 order by

order by 是某字段的全局排序,会导致所有的 map 端数据都进入同一个 reducer 中

sort by 会根据实际情况,启动多个 reducer 进行排序,每个 reducer 内部有序

为了控制 map 端数据分配到 reducer 的 key, 需要配合 distribute by 一同使用,否则 map 端数据会随机分配到 不同的 reducer

group by 代替 count(distinct)

count(distinct) 的逻辑会导致很少的 reducer 处理

-- 原始 sql
select count(distinct uid)
from tab1;-- 优化
select count(1)
from (select uidfrom tab1group by uid
) tmp
;

外层的 count(1) 和 group by 会启动两个 MR job, 因此确保数据量大到启动两个 job 的耗时远小于 计算耗时才划算

group by 配置

  • map 端预聚合

group by 时,如果先起一个 combiner 在 map 端做部分预聚合,可以有效减少shuffle 数据量。

set hive.map.aggr = true;

Map端进行聚合操作的条目数

set hive.groupby.mapaggr.checkinterval = 100000;

当 map 预先聚合的行数超过该值时,会拆分 job

  • 数据倾斜均衡配置

group by 时如果某些key对应的数据量过大,就会发生数据倾斜。Hive 自带了一个均 衡数据倾斜的配置项 hive.groupby.skewindata ,默认值 false

其实现方法是在 group by 时启动两个 MR job。第一个 job 会将 map 端数据随机输入 reducer,每个 reducer 做部分聚合,相同的 key 就会分布在不同的 reducer 中。第二 个 job 再将前面预处理过的数据按key聚合并输出结果,这样就起到了均衡的效果

join 基础优化

Hive join 三种方式

  1. common join

    操作简单,但是性能较差(需要将数据分区,有 shuffle 阶段)

  2. map join

    大表与小表连接,小表数据量(25 M) 可以完全加载到内存
    不会有 reduce 阶段,连接在 map 端完成

  3. bucket map join

    通过两个表分桶在执行连接时会将小表的每个分桶映射成 hash 表,每个 task 节点都需要这个小表的所有 hash 表,但是在执行时只需要加载该 task 所持有大表分 桶对应的小表部分的 hash 表就可以,所以对内存的要求是能够加载小表中最大的 hash 块即可

    小表与大表的分桶数量需要是倍数关系,这个是因为分桶策略决定的,分桶时会根据分桶字段对桶数取余后决定哪个桶的,所以要保证成倍数关系。

  • 利用 map join 特性

  • 利用 分桶表 map join

处理空值 或者 无意义值

这些值的数据量大的时候,会导致集中到同一个分区里,需要用 where 语句过滤掉

需要保留数据的话,将这些数据 添加随机数 打散到不同的分区

key 数据倾斜

添加随机数

调整 map 数

根据

  • 输入文件总数
  • 输入文件大小
  • HDFS 文件块大小

调整 map 数量

对于 小文件采用策略是 合并

对于 复杂文件 采用的策略是 增加 map 数

-- map 数量的计算公式
computeSliteSize(max(minSize, min(maxSize, blocksize))) = blocksize
-- 其中
-- minSize : mapred.min.split.size (默认值1)
-- maxSize : mapred.max.split.size (默认值256M)-- 调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。
-- 建议用set的方式,针对SQL语句进行调整。

调整 reduce 数

参数 hive.exec.reducers.bytes.per.reducer 用来设定每个reducer能够处 理的最大数据量,默认值 256 M

参数 用来设定每个 job 的最大 reducer 数量,默认值 999(1.2 版本之前)或 1009(1.2版本之后)

-- reducer 个数
reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)-- 即, min(输入总数据量 / 256 M, 1009) 个

reducer 数量太多,会产生大量小文件;太少,每个 reducer 需要处理更多的数据,会耗费更多的时间

8.4 优化总结

深入理解 Hadoop 的核心能力,对Hive优化很有帮助。Hadoop/Hive 处理数据过
程,有几个显著特征:

  • 不怕数据多,就怕数据倾斜
  • 对 job 数比较多的作业运行效率相对比较低,比如即使有几百行的表,多次关联 多次汇总,产生十几个jobs,执行也需要较长的时间。MapReduce 作业初始化 的时间是比较长的
  • 对sum、count等聚合操作而言,不存在数据倾斜问题
  • count(distinct) 效率较低,数据量大容易出问题

从大的方面来说,优化可以从几个方面着手:

  • 好的模型设计,事半功倍
  • 解决数据倾斜问题。仅仅依靠参数解决数据倾斜,是通用的优化手段,收获有 限。开发人员应该熟悉业务,了解数据规律,通过业务逻辑解决数据倾斜往往更 可靠
  • 减少 job 数
  • 设置合理的map、reduce task 数
  • 对小文件进行合并,是行之有效的提高Hive效率的方法
  • 优化把握整体,单一作业的优化不如整体最优

8.5 优化案例

建表语句

create table if not exists tuning.student_txt(s_no string comment '学号',s_name string comment '姓名',s_birth string comment '出生日期',s_age int comment '年龄',s_sex string comment '性别',s_score int comment '综合得分',s_desc string comment '自我介绍'
)
row format delimited
fields terminated by '\t';

SQL 案例

查询 student_txt 表中,每个年龄最晚出生的人的出生日期,将结果存入到 student_stat 表中

-- 建表语句
CREATE TABLE student_stat (age INT,brith STRING
)
PARTITIONED BY (tp STRING)	-- tp 表示 最大 或者 最小,分区存放
;-- 开启动态分区
set hive.exec.dynamic.partition.mode=true;-- 插入数据
INSERT OVERWRITE TABLE student_stat PARTITION(tp)
SELECT s_age, MAX(s_birth) stat, 'max' tp
FROM student_txt
GROUP BY s_age
UNION ALL
SELECT s_age, MIN(s_birth) stat, 'min' tp
FROM student_txt
GROUP BY s_age
;

查看执行计划

-- 查看执行计划
EXPLAIN
SELECT * FROM student_txt;-- 年龄大于 22 的,出生日期最晚的前 88 名学生的成绩
EXPLAIN
SELECT s_score, MAX(s_birth)
FROM student_txt
WHERE s_age > 22
GROUP BY s_score
LIMIT 88
;-- 
EXPLAIN
SELECT s_age, MAX(s_birth) stat, 'max' tp
FROM student_txt
GROUP BY s_age
UNION ALL
SELECT s_age, MIN(s_birth) stat, 'min' tp
FROM student_txt
GROUP BY s_age
;

一条 Hive SQL 语句会包含一个或多个Stage,不同的 Stage 间会存在着依赖关系

一个Stage可以是:

  • Mapreduce 任务(最耗费资源)
  • Move Operator (数据移动)
  • Stats-Aggr Operator (搜集统计数据)
  • Fetch Operator (读取数据)等

默认情况下,Hive一次只执行一个stage

执行过程中的要点

  • 问题1: SQL 执行过程中有多少个 Stage(job)
  • 问题2: 为什么在 Stage-1、Stage-9 中有 9 个 Map Task、9 个 Reduce Task
  • 问题3: SQL 语句是否能优化,如何优化
EXPLAIN
INSERT OVERWRITE TABLE student_stat PARTITION(tp)
SELECT s_age, MAX(s_birth) stat, 'max' tp
FROM student_txt
GROUP BY s_age
UNION ALL
SELECT s_age, MIN(s_birth) stat, 'min' tp
FROM student_txt
GROUP BY s_age
;-- EXPLAIN
STAGE DEPENDENCIES:Stage-1 is a root stageStage-2 depends on stages: Stage-1, Stage-9Stage-8 depends on stages: Stage-2 , consists of Stage-5, Stage-4, Stage-6Stage-5Stage-0 depends on stages: Stage-5, Stage-4, Stage-7Stage-3 depends on stages: Stage-0Stage-4Stage-6Stage-7 depends on stages: Stage-6Stage-9 is a root stage

stage 结构图

0 1 2 3 4 5 6 7 9

问题 2

-- map 切片大小
hive (tuning)> set mapred.max.split.size;
mapred.max.split.size=256000000-- reducer 大小
hive (tuning)> set hive.exec.reducers.bytes.per.reducer;
hive.exec.reducers.bytes.per.reducer=256000000-- 输入 Map Task 的数据大小
Data size: 2193190912-- 得出 mapper 数量
2193190912 / 256000000 = 8.567152 = 9

SQL 优化

  • 减少 Map Reduce task 的数量

    即 增大 map 和 reduce 的大小,但是得注意选择合适的 map reduce 数量

  • 减少 stage

    使用Hive多表插入语句。可以在同一个查询中使用多个 insert 子句,这样的好处是 只需要扫描一次源表就可以生成多个不相交的输出。

    from tab1
    insert overwrite table tab2 partition (age)
    select name, address, school,age
    insert overwrite table tab3
    select name, address
    where age>24;
    

    从同一张表选取数据,可以将选取的数据插入其他不同的表中(也可以是相同的表)

    将 “from 表名”,放在SQL语句的头部

    -- 优化后的 SQL
    -- 开启动态分区插入
    set hive.exec.dynamic.partition=true;
    set hive.exec.dynamic.partition.mode=nonstrict;-- 优化后的SQL, 只有 5 个 stage, 减少了一次扫描全表
    -- explain
    from student_txt
    insert overwrite table student_stat partition(tp)
    select s_age, max(s_birth) stat, 'max' tp
    group by s_age
    insert overwrite table student_stat partition(tp)
    select s_age, min(s_birth) stat, 'min' tp
    group by s_age;
    

善用文件格式,优化 SQL

-- 创建表插入数据,改变表的存储格式
create table student_parquet
stored as parquet
as
select * from student_txt;
select count(1) from student_parquet;-- 仅创建表结构,改变表的存储格式,但是分区的信息丢掉了
create table student_stat_parquet
stored as parquet
as
select * from student_stat where 1>2;-- 重新创建表
drop table student_stat_parquet;
create table student_stat_parquet
(age int,
b string)
partitioned by (tp string)
stored as parquet;

SQL 优化总结:

  • 减少了对数据源的扫描
  • 使用了列式存储格式

九、Hive 案例

数据:

-- 

建表语句

-- 创建数据库
-- DROP DATABASE sale CASCADE;
CREATE DATABASE IF NOT EXISTS sale;-- 创建普通表
CREATE TABLE sale.dimdate_ori (dt date,yearmonth int,year smallint,month tinyint,day tinyint,week tinyint,weeks tinyint,quat tinyint,tendays tinyint,halfmonth tinyint
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ","
;CREATE TABLE sale.sale_ori (orderid string,locationid string,dt date
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ","
;CREATE TABLE sale.saledetail_ori (orderid string,rownum int,goods string,num int,price double,amount double
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ","
;-- 创建 ORC 存储格式的表
CREATE TABLE sale.dimdate (dt date,yearmonth int,year smallint,month tinyint,day tinyint,week tinyint,weeks tinyint,quat tinyint,tendays tinyint,halfmonth tinyint
)
STORED AS ORC
;CREATE TABLE sale.sale (orderid string,locationid string,dt date
)
STORED AS ORC
;CREATE TABLE sale.saledetail (orderid string,rownum int,goods string,num int,price double,amount double
)
STORED AS ORC
;

加载数据

-- 加载数据到普通表
USE sale;LOAD DATA LOCAL INPATH '/home/hadoop/data/tbDate.dat' OVERWRITE INTO TABLE dimdate_ori;
LOAD DATA LOCAL INPATH '/home/hadoop/data/tbSale.dat' OVERWRITE INTO TABLE sale_ori;
LOAD DATA LOCAL INPATH '/home/hadoop/data/tbSaleDetail.dat' OVERWRITE INTO TABLE saledetail_ori;-- 导入数据到 ORC 表
INSERT INTO TABLE dimdate SELECT * FROM dimdate_ori;
INSERT INTO TABLE sale SELECT * FROM sale_ori;
INSERT INTO TABLE saledetail SELECT * FROM saledetail_ori;

9.1 SQL

  1. 按年统计销售额
-- 根据 orderid join 两个表 saledetail 和 sale
-- 按照 年 分组聚合SELECT year(s.dt) year, round(sum(sd.amount)/10000, 2) amount
FROMsaledetail AS sd
JOIN sale AS s
ON sd.orderid = s.orderid
GROUP BY year(s.dt)
;
  1. 销售金额在 10W 以上的订单
SELECTorderid, round(sum(amount), 2) amount_cal
FROMsaledetail
GROUP BY orderid
HAVING sum(amount) > 100000
;
  1. 每年销售额的差值
SELECTyear, amount, lag(amount) OVER (ORDER BY year) last_year_amount, round(amount - lag(amount) OVER (ORDER BY year), 2) diff
FROM (SELECT year(s.dt) year, round(sum(sd.amount)/10000, 2) amountFROMsaledetail AS sdJOIN sale AS sON sd.orderid = s.orderidGROUP BY year(s.dt)
) tmp
;
  1. 年度订单金额前10位(年度、订单号、订单金额、排名)
SELECTyear, orderid, amount_id amount, amount_rank
FROM (SELECTyear, orderid, amount_id,row_number() OVER (PARTITION BY year ORDER BY amount_id DESC) amount_rankFROM (SELECTyear(s.dt) year, sd.orderid, round(sum(sd.amount), 2) amount_idFROMsaledetail sdJOIN sale sON sd.orderid = s.orderidGROUP BY s.dt, sd.orderid) tmp
) tmp2
WHERE amount_rank <= 10-- WITH table_name AS ()
WITH tmp AS (SELECTyear(s.dt) year, sd.orderid, round(sum(sd.amount), 2) amount_idFROMsaledetail sdJOIN sale sON sd.orderid = s.orderidGROUP BY s.dt, sd.orderid
)
SELECTyear, orderid, amount_id amount, amount_rank
FROM (SELECTyear, orderid, amount_id,row_number() OVER (PARTITION BY year ORDER BY amount_id DESC) amount_rankFROM tmp
) tmp2
WHERE amount_rank <= 10
;
  1. 季度订单金额前10位(年度、季度、订单id、订单金额、排名)
-- 
SELECTyear, quat, orderid, round(amount_order, 2) amount, amount_rank
FROM (SELECTyear,quat,orderid,amount_order,row_number() OVER (PARTITION BY year, quat ORDER BY amount_order DESC) amount_rankFROM (SELECTyear(tmp.dt) year, d.quat, orderid, sum(amount) amount_orderFROM (SELECTs.dt, sd.orderid, sd.amountFROMsaledetail sdJOIN sale sON sd.orderid = s.orderid) tmp	-- JOIN dimdate dON tmp.dt = d.dtGROUP BY year(tmp.dt), d.quat, tmp.orderid) tmp2	-- 
) tmp3
WHERE amount_rank <= 10
;-- WITH
WITH tmp1 AS (SELECTyear(tmp.dt) year, d.quat, orderid, sum(amount) amount_orderFROM (SELECTs.dt, sd.orderid, sd.amountFROMsaledetail sdJOIN sale sON sd.orderid = s.orderid) tmp	-- JOIN dimdate dON tmp.dt = d.dtGROUP BY year(tmp.dt), d.quat, tmp.orderid
)
SELECTyear, quat, orderid, round(amount_order, 2) amount, amount_rank
FROM (SELECTyear,quat,orderid,amount_order,row_number() OVER (PARTITION BY year, quat ORDER BY amount_order DESC) amount_rankFROM tmp1
) tmp3
WHERE amount_rank <= 10
;
  1. 求所有交易日中订单金额最高的前10位
WITH tmp AS (SELECTs.dt, sd.orderid, sum(sd.amount) amountFROMsaledetail sdJOIN sale sON sd.orderid = s.orderidGROUP BY s.dt, sd.orderid
)
SELECTdt, orderid, round(amount, 2) amount, amount_rank
FROM (SELECTdt, orderid, amount, row_number() OVER (PARTITION BY dt ORDER BY amount DESC) amount_rankFROMtmp
) tmp2
WHERE amount_rank <= 10
;
  1. 每年度销售额最大的交易日
WITH tmp AS (SELECTs.dt, sum(amount) amountFROMsaledetail sdJOIN sale sON sd.orderid = s.orderidGROUP BY s.dt
)
SELECTyear(dt) year,round(max(amount), 2) max_amount_day
FROMtmp
GROUP BY year(dt)
;-- 结果中体现 交易日期
WITH tmp1 AS (SELECTdt, amount, row_number() OVER (PARTITION BY year(dt) ORDER BY amount DESC) amount_rankFROM (SELECTs.dt, sum(amount) amountFROMsaledetail sdJOIN sale sON sd.orderid = s.orderidGROUP BY s.dt) tmp
)
SELECTyear(dt) year,dt,round(amount, 2) max_amount_day
FROMtmp1
WHERE amount_rank = 1
;
  1. 年度最畅销的商品(即每年销售金额最大的商品)
WITH tmp AS (SELECTyear(s.dt) year, sd.goods, round(sum(amount), 2) amount_goodsFROMsaledetail sdJOIN sale sON sd.orderid = s.orderidGROUP BY year(s.dt), sd.goods
)
SELECTyear, goods, amount_goods
FROM (SELECTyear, goods, amount_goods, row_number() OVER (PARTITION BY year ORDER BY amount_goods DESC) rankFROMtmp
) tmp2
WHERE rank = 1
;

Hue 数据交互工具

1. 安装、编译

  1. 解压 hue

  2. 安装依赖

yum install ant asciidoc cyrus-sasl-devel cyrus-sasl-gssapi cyrus-sasl-plain gcc gcc-c++ krb5-devel libffi-devel libxml2-devel libxslt-devel make mysql mysql-devel openldap-devel python-devel sqlite-devel gmp-devel

  1. 安装 maven

解压 maven 压缩包,添加环境变量

  1. 编译
# 进入 hue 源码目录
# PREFIX 指定安装 hue 的路径
cd /opt/software/hue-release-4.3.0PREFIX=/opt/lagou/servers/ make install
  1. 修改集群配置
<!-- hdfs-site.xml -->
<!-- HUE -->
<property> <name>dfs.webhdfs.enabled</name><value>true</value>
</property><property> <name>dfs.permissions.enabled</name><value>false</value>
</property><!-- core-site -->
<!-- HUE -->
<property> <name>hadoop.proxyuser.hue.hosts</name> <value>*</value>
</property><property><name>hadoop.proxyuser.hue.groups</name><value>*</value>
</property><property> <name>hadoop.proxyuser.hdfs.hosts</name> <value>*</value>
</property><property><name>hadoop.proxyuser.hdfs.groups</name><value>*</value>
</property>

增加 httpfs-site.xml 配置文件

<configuration><!-- HUE --><property> <name>httpfs.proxyuser.hue.hosts</name> <value>*</value></property><property><name>httpfs.proxyuser.hue.groups</name><value>*</value></property>
</configuration>
  1. 修改 Hue 配置
cd /opt/lagou.servers/hue
cd desktop/confcp pseudo-distributed.ini.tmpl pseudo-distributed.ini
vi vi pseudo-distributed.ini# 修改如下参数# [desktop]http_host=linux122http_port=8000is_hue_4=truetime_zone=Asia/Shanghaidev=trueserver_user=hueserver_group=huedefault_user=hue# 211行左右。禁用solr,规避报错 
app_blacklist=search# [[database]]。Hue默认使用SQLite数据库记录相关元数据,替换为mysqlengine=mysqlhost=linux123port=3306user=hivepassword=12345678name=hue# 1003行左右,Hadoop配置文件的路径
hadoop_conf_dir=/opt/lagou/servers/hadoop-2.9.2/etc/hadoop# 在mysql中创建数据库hue,用来存放元数据 mysql -uhive -p12345678
mysql> create database hue;# hue 命令, 初始化数据库 
build/env/bin/hue syncdb 
build/env/bin/hue migrate# 增加 hue 用户和用户组 
groupadd hue 
useradd -g hue hue# 运行监控进场,查看 hue 进程
./supervisor

2. 整合 Hadoop, Hive

# 没有安装 Solr,禁用,否则一直报错 
app_blacklist=search
# [hadoop] -- [[hdfs_clusters]] -- [[[default]]] 
# 注意端口号。下面语句只要一个
# fs_defaultfs=hdfs://linux121:8020 
fs_defaultfs=hdfs://linux121:9000
webhdfs_url=http://linux121:50070/webhdfs/v1hadoop_conf_dir=/opt/lagou/servers/hadoop-2.9.2/etc/hadoop# [hadoop] -- [[yarn_clusters]] -- [[[default]]]
resourcemanager_host=linux123
resourcemanager_port=8032
submit_to=True
resourcemanager_api_url=http://linux123:8088
proxy_api_url=http://linux123:8088
history_server_api_url=http://linux123:19888

开启 hue 的 WebUI(linux122:8000) 前,需要启动集群,以及 hiveserver2

本文标签: Hive Hue