python操作hive和hive_sql语句
Hive是⼀个数据仓库基础的应⽤⼯具,在Hadoop中⽤来处理结构化数据,通过类SQL语⾔对数据进⾏操作。Hive将sql语句通过解析器转换成MapReduce作业提交到Hadoop集上,Hadoop监控作业执⾏过程,并将执⾏结果返回给⽤户。
值得注意的是,Hive并不⽀持⾏级数据的更新,主要使⽤场合为⼤数据集的批处理作业中。
下⾯为Hive中常⽤的SQL语句,‘[ ]’中的内容根据实际需求来确定要不要写。
复制代码
– 创建数据库
create database name;
– 常⽤显⽰命令
show databases; – 查看有哪些数据库
show tables; – 查看当前数据库下有哪些表
show tables like ‘cc’ – 正则表达式显⽰表
show partitions; – 查看分区
python新手代码useridshow functions;
describe extended table_name; – 查看表的结构,字段,分区等情况
– 建表语句
create [external] table [if not exists] table_name --创建表,指定表名,默认为内部表
[(col_name data_type [comment col_comment], …)] – 创建字段,指定字段类型、注释
[comment table_comment] – 表的注释
[partitioned by (col_name data_type [comment col_comment], col_name_2 data_type_2, …)] – 指定分区,要注意分区字段不能出现的建表的字段中
[clustered by (col_name, col_name_2, …)] [sorted by (col_name [ASC|DESC], …)] into num_buckets buckets] – 分桶
[row format row_format]
[stored as file_format] – 指定存储⽂件类型
[location hdfs_path] – 存储路径
·external 表⽰创建的表是否为外部表,默认为内部表
·if not exists 表⽰该表不存在时创建该表,否则忽略异常
·comment 为表、字段增加注释
·row_format
row format delimited [fields terminated by char]
[collection items terminated by char]
[map keys terminated by char]
[lines terminated by char]
·
file_format
stored as textfile – 纯⽂本数据
stored as sequencefile – 数据需要压缩,节省存储空间
– like关键字复制表结构
create table table_name like old_table_name;
– 更改表名
alter table table_name rename to new_table_name;
– 增加⼀个字段 并 添加注释
alter table table_name add columns (col_name data_type comment ‘col_comment’);
– 删除列
alter table table_name replace columns (col_name data_type, col_name_2 data_type_2);
– 增加、删除分区
alter table table_name add [if not exists] partition_name; – 增加
alter table table_name drop partition_name, partition_name_2; – 删除
复制代码
复制代码
– 插⼊数据
insert into table_1 select * from table_2; – 在table_1后追加数据
insert overwrite table_1 select * from table_2; – 先将table_1中数据清空,然后添加数据
– 提取数据常⽤语句
select [distinct] select_expr_1, select_expr_2
from table_name
[where condition] – 筛选条件
[group by col_list [having condition]] – 分组、分组返回的条件
[order by col_list] – 排序
[limit num_1, num_2] – 返回数据的起始位置(num_1)以及返回数据的记录数(num_2)
复制代码
⼀、前⾔
做⼤数据分析及应⽤过程中,时常需要⾯对海量的数据存储及计算,传统的服务器已经很难再满⾜⼀些运算需求,基于hadoop/spark的⼤数据处理平台得到⼴泛的应⽤。本⽂介绍⽤python读取hive数据库的⽅式,其中还是存在⼀些坑,这⾥我也把⾃⼰遇到的进⾏分享交流。
基本情况
集团有20台服务器(其中1台采集主节点,1台⼤数据监控平台,1台集主节点,17台集节点),65THDFS的磁盘资源,3.5T的yarn 内存,等等。项⽬⽬前需要对集团的家庭画像数据分析,通过其楼
盘,收视节⽬偏好,家庭收⼊等数据进⾏区域性的分析;同时对节⽬画像及楼盘详细数据进⾏判断分析。本⼈习惯使⽤R语⾔和Python来分析,故采⽤了本次分享的数据获取部分的想法。
⼆、Python连接hive
1、pyhive⽅式连接hive数据库
⾸先是配置相关的环境及使⽤的库。sasl、thrift、thrift_sasl、pyhive。
其中sasl是采⽤0.2.1版本的,选择适合⾃⼰的即可。下载⽹址:www.lfd.uci.edu/~gohlke/pythonlibs/#sasl
from pyhive import hive
import pandas as pd
读取数据
def select_pyhive(sql):
# 创建hive连接
conn = hive.Connection(host=‘10.16.15.2’, port=10000, username=‘hive’, database=‘user’)
cur = conn.cursor()
try:
#c = cur.fetchall()
df = pd.read_sql(sql, conn)
return df
finally:
if conn:
conn.close()
sql = “select * from user_huaxiang_wide_table”
df = select_pyhive(sql)
获取到hive数据库中约193W的家庭画像数据,37个字段。
可以看出代码并不是很复杂,但是⼤家在测试时可能会出现以下两种常见的问题。
问题⼀:
‘TSaslClientTransport’ object has no attribute ‘readAll’
解决⼀:
pip install thrift_sasl==0.3.0 -i pypi.tuna.tsinghua.edu/simple,更新依赖thrift_sasl包到0.3.0即可
问题⼆:Could not start SASL: Error in sasl_client_start (-4) SASL(-4)
解决⼆:
1.寻到sasl的安装位置,⼀般来说是如下位置:
C:\Users\你计算机的⽤户名字\AppData\Local\Programs\Python\Python37-32\Lib\site-packages\sasl\sasl2
2. C盘新建⽂件夹 C:\CMU\bin\sasl2
3. 将第⼀步中的saslPLAIN.dll拷贝⾄第⼆步新建的⽂件夹中
2、impala⽅式连接hive数据库
impala⽅式连接hive数据库,但是数据量过⼤会导致python卡死,⽬前还未到合适⽅式解决。
⾸先是配置相关的环境及使⽤的库。sasl、thrift、thrift_sasl、impala。
其中sasl是采⽤0.2.1版本的,选择适合⾃⼰的即可。下载⽹址:www.lfd.uci.edu/~gohlke/pythonlibs/#sasl
from impala.dbapi import connect
from impala.util import as_pandas
import pandas as pd
获取数据
def select_hive(sql):
# 创建hive连接
conn = connect(host=‘10.16.15.2’, port=10000, auth_mechanism=‘PLAIN’,user=‘hive’, password=‘user@123’, database=‘user’)
cur = conn.cursor()
try:
#ute(sql)
c = cur.fetchall()
df = as_pandas(cur)
return df
finally:
if conn:
conn.close()
data = select_hive(sql = ‘select * from user_huaxiang_wide_table limit 100’)
这个impala⽅式也是很⽅便,但是当数据量到达⼀定程度,则就会在fetchall处⼀直处于运⾏状态,⼏个⼩时也没有响应。
使⽤ python 操作 hadoop 好像只有 少量的功能,使⽤python 操作 hive 其实还有⼀个hiveserver 的⼀个包,不过 看这个 pyhive 应该是⽐较好⽤的。
安装依赖
pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive
操作
from pyhive import hive
conn = hive.Connection(host=‘xxxx’, port=10000, username=‘xxx’, database=‘default’)
for result in cursor.fetchall():
print result
##真实 内⽹ 测试
from pyhive import hive
conn = hive.Connection(host=‘172.16.16.32’, port=10000,
username=‘zhuzheng’,auth=‘LDAP’,password=“abc123.” ,database=‘fkdb’)
cursor=conn.cursor()
for result in cursor.fetchall():
print(result)
###如果 hive 有账号密码 你需要 写上,如果 hive 不在 同⼀台机器 也要写明 ip 和port,
###授权模式 需要选择合适的,我这⾥使⽤的上 LDAP, 数据库呢 ,你 需要连接你⾃⼰ 正确的。
####其实在捣⿎是 各种报错,有账号密码 写错 和 授权模式错误 数据库不存在 ,thift 报错 等的,整的⼈⼼ 烦躁
from impala.dbapi import connect
#需要注意的是这⾥的auth_mechanism必须有,但database不必须
conn = connect(host=‘127.0.0.1’, port=10000, database=‘default’, auth_mechanism=‘PLAIN’)
cur = conn.cursor()
print(cur.fetchall())
pip uninstall impyla
pip install thrift==0.9.3
pip install impyla==0.13.8
from pyhive import hive
conn = hive.Connection(host=“YOUR_HIVE_HOST”, port=PORT, username=“YOU”)
cursor = conn.cursor()
for result in cursor.fetchall():
use_result(result)
import pandas as pd
df = pd.read_sql(“SELECT cool_stuff FROM hive_table”, conn)
from pyhive import hive
import pandas as pd
def getData():
conn = hive.Connection(host=“1.0.1.38”, auth=“CUSTOM”, username=‘hive’, password=“pvXxHTsdqrt8”, port=10000, database=‘tapro_atg’)
df = pd.read_sql(“select * from sales_data_leisure_view”, conn)
records = df.head(n=100000)
_json(orient=‘records’))
getData();
import pandas as pd
from pyhive import hive
conn = t(‘192.168.72.135’)
cursor = conn.cursor()
sql = “select * from t2 where city=‘Shanghai’”
res = cursor.fetchall()
df = pd.DataFrame(res, columns=[‘id’, ‘name’, ‘year’, ‘city’])
df1 = pd.read_sql(sql, conn, chunksize=3)
for chunk in df1:
print(chunk)
-- coding:utf-8 --
import pandas as pd
from pyhive import hive
import time
import datetime
import os
def rfail(s, file_path):
with open(file_path, “a+”) as f:
f.write(s + “\n”)
def read_query(sql):
hive_line = ‘’‘hive -e “set hive.cli.print.header=true;set mapreduce.job.queuename=hl_report;%s”;’’’ % (sql) data_buffer = os.popen(hive_line)
data = pd.read_table(data_buffer, sep="\t", chunksize=10000)
return data
def get_from_hive(query, mode, engine_hive):
#engine_hive = hive.Connection(host=“xxxxx”, port=10000, username=“xxxx”)
if mode == “pyhive”:
data = pd.read_sql(query, engine_hive)
return data
elif mode == “raw”:
data = read_query(query)
return data
else:
print(“mode: pyhive or raw”)
return None
def gen_date(bdate, days):
day = datetime.timedelta(days=1)
for i in range(days):
s = bdate + day * i
# print(type(s))
yield s.strftime("%Y-%m-%d")
def get_date_list(start=None, end=None):
if (start is None) | (end is None):
return []
else:
data = []
for d in gen_date(start, (end - start).days):
data.append(d)
return data
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论