安装
pip install sasl pip install thrift pip install thrift-sasl pip install PyHive
ubuntu安装sasl报错
# 先执行 sudo apt-get install libsasl2-dev # 在安装 pip install sasl
开启HiveServer2
HiveServer2 是一种可选的 Hive 内置服务,可以允许远程客户端使用不同编程语言向 Hive 提交请求并返回结果。HiveServer2 是 HiveServer1 的改进版,主要解决了无法处理来自多个客户端的并发请求以及身份验证问题。
- 我这里在redis02:192.168.92.146上启动的
<property> <name>hive.server2.transport.mode</name> <value>binary</value> <description> Expects one of [binary, http]. Transport mode of HiveServer2. </description> </property> <property> <name>hive.server2.thrift.port</name> <value>10000</value> <description>Port number of HiveServer2 Thrift interface when hive.server2.transport.mode is 'binary'.</description> </property> <property> <name>hive.server2.thrift.bind.host</name> <value>192.168.92.146</value> </property> <property> <name>hive.server2.authentication</name> <value>NOSASL</value> </property>
默认情况下,HiveServer2 启动使用的是默认配置。这些配置主要是服务启动的 Host 和端口号以及客户端和后台操作运行的线程数。我们可以重写 hive-site.xml 配置文件中的配置项来修改 HiveServer2 的默认配置:
配置项 | 默认值 | 说明 |
hive.server2.transport.mode | binary | HiveServer2 的传输模式,binary或者http |
hive.server2.thrift.port | 10000 | HiveServer2 传输模式设置为 binary 时,Thrift 接口的端口号 |
hive.server2.thrift.http.port | 10001 | HiveServer2 传输模式设置为 http 时,Thrift 接口的端口号 |
hive.server2.thrift.bind.host | localhost | Thrift服务绑定的主机 |
hive.server2.thrift.min.worker.threads | 5 | Thrift最小工作线程数 |
hive.server2.thrift.max.worker.threads | 500 | Thrift最大工作线程数 |
hive.server2.authentication | NONE | 客户端认证类型,NONE、LDAP、KERBEROS、CUSTOM、PAM、NOSASL |
hive.server2.thrift.client.user | anonymous | Thrift 客户端用户名 |
hive.server2.thrift.client.password | anonymous | Thrift 客户端密码 |
启动
启动 HiveServer2 非常简单,我们需要做的只是运行如下命令即可:
$HIVE_HOME/bin/hiveserver2 & # 或者 $HIVE_HOME/bin/hive --service hiveserver2 &
检查 HiveServer2 是否启动成功的最快捷的办法就是使用 netstat 命令查看 10000 端口是否打开并监听连接:
netstat -nl | grep 10000
其他
注意:元数据服务和客户端也可配置到一台机器,配置如下
<!-- 数据在hdfs中的存储位置 --> <property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> </property> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://redis01:3306/hive?createDatabaseIfNotExist=true</value> <description>Mysql连接协议:JDBC connect string for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> <description>JDBC连接驱动:Driver class name for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>my_user</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>123456</value> <description>password to use against metastore database</description> </property> <property> <name>hive.metastore.uris</name> <value>thrift://redis01:9083</value> </property>
报错
发生异常: OperationalError TOpenSessionResp(status=TStatus(statusCode=3, infoMessages=['*org.apache.hive.service.cli.HiveSQLException:Failed to open new session: java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: root is not allowed to impersonate my_user:14:13',
解决方法
vim /usr/locl/hadoop/etc/hadoop/core-site.xml <!-- 用户“root”可以代理所有主机上的所有用户 --> <property> <name>hadoop.proxyuser.root.hosts</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.root.groups</name> <value>*</value> </property>
重启Hadoop集群
使用
注意:hive主要是用来做数据仓库的所以支持查和增数据,默认不能使用改和删数据。
from pyhive import hive conn = hive.Connection(host='192.168.92.146', port=10000, username='root', auth='NOSASL') curs = conn.cursor() # 获取一个游标 sql = 'show databases' # 操作语句 curs.execute(sql) # 执行sql语句 print(curs.fetchall()) # 输出获取结果的所有行
# 查看数据库db_test,下面有哪些表 from pyhive import hive conn = hive.Connection(host='192.168.31.224', port=10000, username='zhong', auth='NOSASL', database='db_test') curs = conn.cursor() # 获取一个游标 sql = 'show tables' # 操作语句 curs.execute(sql) # 执行sql语句 print(curs.fetchall()) # 输出获取结果的所有行
Demo
官方也意识到了hive要访问MySQL数据库之后再访问HDFS,HDFS再访问磁盘的缓慢,所以也支持异步的方式操作hive。官方的实例如下(经过修改):
from pyhive import hive from TCLIService.ttypes import TOperationState conn = hive.Connection(host='192.168.92.146', port=10000, username='root', auth='NOSASL', database='db_test') cursor = conn.cursor() # 获取游标 sql = 'show tables' # sql语句 cursor.execute(sql, async_=True) # 声明是异步函数 status = cursor.poll().operationState # 记录启用循环队列的状态 while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE): # 如果时起始状态或者正在运行状态 logs = cursor.fetch_logs() # 获取运行过程中产生的日志信息,返回值放在列表中 for message in logs: # 迭代日志信息 print(message) status = cursor.poll().operationState # 再次记录启用循环队列的状态 # 获取到结果,循环队列结束 print(cursor.fetchall()) # 输出查询结果
官方并没有详细解释了怎么使用异步方式操作hive,比如,没有解释怎么增加异步运行任务,而且使用的是最原始的循环。如果正常代码中使用该方法进行异步操作,将没有任何意义,还会增加工作量。还不如使用
concurrent.futures对象
的方法将该任务转为异步的进程池或线程池内的子任务。from pyhive import hive from concurrent.futures.thread import ThreadPoolExecutor def func_1(): conn = hive.Connection(host='192.168.92.146', port=10000, username='root', auth='NOSASL', database='db_test') curs = conn.cursor() # 获取一个游标 sql = 'show tables' # 操作语句 curs.execute(sql) # 执行sql语句 print(curs.fetchall()) # 输出获取结果的所有行 if __name__ == "__main__": pool = ThreadPoolExecutor(max_workers=5) # 创建线程池 pool.submit(func_1) # 提交一个任务,第一个参数是普通函数的本身,后面的所有参数都是普通函数的参数
参考