如何使⽤python操作kudu
⽂章⽬录
本⽂只讨论如何使⽤kudu提供的Python相关api,不涉及kudu⾃⾝环境的搭建和配置。
环境准备
注意:在安装kudu-python之前需要先确保已经配置好了kudu的C++ Client Libraries,并且不同的操作系统之间的依赖是需要分别配置的,这⾥只讨论Ubuntu和Centos。
C++ Client Libraries
详情请参考官⽹:
Ubuntu
sudo apt-get -y install autoconf automake curl flex g++ gcc gdb git\
krb5-admin-server krb5-kdc krb5-user libkrb5-dev libsasl2-dev libsasl2-modules \
libsasl2-modules-gssapi-mit libssl-dev libtool lsb-release make ntp \
openjdk-8-jdk openssl patch pkg-config python rsync unzip vim-common
git clone github/apache/kudu
cd kudu
thirdparty/build-if-necessary.sh
mkdir -p build/release
cd build/release
../../thirdparty/installed/common/bin/cmake \
-DCMAKE_BUILD_TYPE=release ../..
make -j4
Centos
官⽹上devtoolset-3的安装⽅法已经过期了,centos-release-scl不再维护7以下的版本。
sudo yum -y install autoconf automake cyrus-sasl-devel cyrus-sasl-gssapi \
cyrus-sasl-plain flex gcc gcc-c++ gdb git java-1.8.0-openjdk-devel \
krb5-server krb5-workstation libtool make openssl-devel patch pkgconfig \
redhat-lsb-core rsync unzip vim-common which centos-release-scl-rh devtoolset-8
git clone github/apache/kudu
cd kudu
build-support/enable_devtoolset.sh thirdparty/build-if-necessary.sh
mkdir -p build/release
cd build/release
../../build-support/enable_devtoolset.sh \
../../thirdparty/installed/common/bin/cmake \
-
DCMAKE_BUILD_TYPE=release \
../..
make -j4
kudu-python
⾸次安装
在安装完kudu-python需要的相关依赖后,并不要急于安装kudu-python的包,以上提供的kudu⽬录实在是太⼤了,⼤约有60G,不可能每次都去重新安装或者拷贝⼀遍。我们先把kudu移动到⽅便的⽬录下,并且保证之后的每台机器的kudu依赖都安装于此,之后设置⼀下KUDU_HOME。随后就可以开始pip install kudu-python了。
安装命令运⾏完后记得将wheel⽂件的位置记录下来,其他机器上kudu-python的安装都要依靠这个⽂件。
复⽤安装
光有⼀个wheel⽂件是不够的,虽然能够成功地安装kudu-python了,在真正运⾏的时候会出现不到so⽂件的异常。这种时候就需要在之前安装kudu的机器上将$KUDU_HOME/build/latest/lib/exported下的所有⽂件拷贝到需要安装的机器的同样⽬录下,pip install wheel,之后即可正常运⾏。
需要注意的是,Ubuntu和Centos之间是不能互相使⽤wheel⽂件和so⽂件的,需要各⾃进⾏操作,但是他们的wheel⽂件名是⼀样的,使⽤docker安装的⼩伙伴如果将其拷贝到宿主机时不要放在同⼀⽬录下。
使⽤
import kudu
from kudu.client import Partitioning
from kudu.schema import Schema
type_mapper ={
"int8": kudu.int8,
"int16": kudu.int16,
"int32": kudu.int32,
"int64": kudu.int64,linux mysql教程视频
"float": kudu.float,
"double": kudu.double,
"decimal": kudu.decimal,
"binary": kudu.binary,
"string": kudu.string
}
class KuduClient:
_instance =None
def__new__(cls,*args,**kwargs):
if not cls._instance:
return object.__new__(cls)
return cls._instance
def__init__(self):
# Connect to Kudu master server
linux一次执行多个命令脚本 = t(host={kudu_host}, port={kudu_port})
self.session = w_session()# session没有关闭的⽅法,但是可以设置过期时间
@staticmethod
def builder()-> kudu.schema:
return kudu.schema_builder()
@staticmethod
def schema(builder: kudu.schema, columns:list)-> kudu.schema:
"""
:param builder:
:param columns: [
{
"name": "student_no",
"type": "int32",
"nullable": False,
"primary_key": True
}, {
"name": "age",
"type": "int8",
"nullable": False,
"primary_key": True
}, {
"name": "name",
"type": "string",
"nullable": True
}, {
"name": "gender",
"type": "string",
"nullable": True
"nullable": True
}
]
免费个人二级域名网站:return:
"""
primary_key =[]
for column in columns:
("primary_key"):
primary_key.("name"))
builder.add_column(
("name"),
type_=(("type")),
nullable=False if ("nullable")else True,
("compression"),
("encoding"),
("default"),
block_("block_size"),
("precision"),
("scale")
)
builder.set_primary_keys(primary_key)
return builder.build()
@staticmethod
def partition(hash_columns:list, range_columns:list=None, bound:dict=None, bucket_num=3)-> Partitioning: # Define partitioning schema
partition = Partitioning()
for column in hash_columns:
partition.add_hash_partitions(column_names=column, num_buckets=bucket_num)
partition.set_range_partition_columns(range_columns)
# partition.add_range_partition_split(range_columns)
partition.add_range_partition(
lower_("lower_bound"),
upper_("upper_bound"),
lower_bound_("lower_bound_type")or"inclusive",
upper_bound_("upper_bound_type")or"exclusive"
)
return partition
def add_column(self, table: kudu.Table, column:dict)->None:
"""
添加⼀列
:param table:
:param column:
:return:
"""
alter = w_table_alterer(table)
alter.add_column(
("name"),
type_=(("type")),
nullable=False if ("nullable")else True,
("compression"),
("encoding"),
("default")
)
alter.alter()
def add_range_partition(self, table: kudu.Table, bound:dict)->None:
"""
:
param table:
:param bound:{
"lower_bound": {"create_time": w().strftime("%Y-%m-%d 00:00:00")},
"upper_bound": {"create_time": w().strftime("%Y-%m-%d 23:59:59")}
"upper_bound": {"create_time": w().strftime("%Y-%m-%d 23:59:59")}
}
:return:
"""
alter = w_table_alterer(table)
alter.add_range_partition(
lower_("lower_bound"),
upper_("upper_bound"),
lower_bound_("lower_bound_type")or"inclusive",
upper_bound_("upper_bound_type")or"exclusive"
)
alter.alter()
def drop_range_partition(self, table: kudu.Table, bound:dict)->None:
alter = w_table_alterer(table)
alter.drop_range_partition(
lower_("lower_bound"),
upper_("upper_bound"),
lower_bound_("lower_bound_type")or"inclusive",
upper_bound_("upper_bound_type")or"exclusive"
)
alter.alter()
def show_tables(self)->list:
list_tables()
def create_table(self, table_name:str, schema: kudu.schema, partition: Partitioning, replica=3)->None: # Create new table
def drop_table(self, table_name:str)->None:
def table(self, table_name:str)-> kudu.Table:
# Open a table
able(table_name)
def insert(self, table: kudu.Table, rows:list)->None:
"""
:param table:
:param rows: [{"student_no": 11, "age": 12, "name": "amy"}]
:return:
"""
for row in rows:
op = w_insert(row)
self.session.apply(op)
try:
self.session.flush()
except kudu.KuduBadStatus:
return _pending_errors()
查询核酸检测结果@classmethod
def__del(cls):
cls._instance =None
def__del__(self):
self.__del()
if __name__ =='__main__':
import time
client = KuduClient()
builder = client.builder()
columns =[
columns =[
{
"name":"student_no",
"type":"int32",
"nullable":False,
"primary_key":True
},{
"name":"age",
"type":"int8",
"nullable":False,
"primary_key":True
},{
"name":"create_time",
"type":"string",
"nullable":False,
"primary_key":True
},{
"name":"name",
"type":"string",
逍遥发卡网源码
"nullable":True
},{
"name":"gender",
"type":"string",
"nullable":Truecentos和ubuntu
}
]
# bound⼀定要⽤dict写,⽤list会莫名其妙把第⼀个定义的主键加⼊range partition
bound ={
"lower_bound":{"create_time": w().strftime("%Y-%m-%d 00:00:00")},
"upper_bound":{"create_time": w().strftime("%Y-%m-%d 23:59:59")}
}
schema = client.schema(builder, columns)
partition = client.partition(["student_no","age"],["create_time"], bound)
print(partition.__dict__)
client.drop_table("python_kudu_test")
table = client.table("python_kudu_test")
# client.add_range_partition(table, bound)
# 要确保插⼊的数据在range partition的范围内,否则⽆法插⼊也不会报错
client.insert(table,[{"student_no":11,"age":12,"name":"amy","create_time":""}])
# print(client.show_tables())
总结
以上仅是个⼈在使⽤kudu-python时踩坑的⼀些⼼得,如果有没注意到的地⽅欢迎各位指正。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。