python通过csv插⼊数据到clickhouse 第⼀步:连接clickhouse
第⼆步:读取CSV
第三步:转换成与clickhouse相匹配的数据
第四步:插⼊,3万⾏耗时2.3s
1from clickhouse_driver import Client
2import types
3import time,datetime
4from datetime import date
5
6import csv
7
8python怎么读csv数据
9client = Client(host='192.168.1.231',database='test6',user='default',password='')
10creattable="""CREATE TABLE test1 (\
11 consumption_id UInt64,\
12 member_id UInt64,\
13 fans_id UInt64,\
14 bill_date Date,\
15 money Float32,\
16 people_num UInt8,\
17 dish_name String,\
18 created_org UInt8,\
19 open_id String,\
20 subscribed_time DateTime,\
21 unsubscribed_time DateTime,\
22 source_type UInt8,\
23 sns_type UInt8,\
24 is_subscribed UInt8\
25 )ENGINE=MergeTree(bill_date,(consumption_id,created_org),8192)"""
27data=[]
28with open(r'C:\Users\Administrator\Desktop\test.csv') as csvfile:
29 readCSV = ader(csvfile, delimiter=',')
30 for row in readCSV:
31 row[0]=int(row[0])
32 row[1]=int(row[1])
33 row[2]=int(row[2])
34 x=row[3].split('/')
35 row[3]=date(int(x[0]),int(x[1]),int(x[2]))
36 row[10]=datetime.datetime.strptime(row[10],'%Y/%m/%d %H:%M')
37 row[9]=datetime.datetime.strptime(row[9],'%Y/%m/%d %H:%M')
38 row[4]=float(row[4])
39 row[5]=int(row[5])
40 row[7]=int(row[7])
41 row[11]=int(row[11])
42 row[12]=int(row[12])
43 row[13]=int(row[13])
44 data.append(row)
45
46
47try:
48 print(data[0])
49 print(type(time.time()))
50 print(type(row[0]),type(row[1]),type(row[2]),type(row[3]),type(row[4]),type(row[5]),type(row[6]),type(row[7]),type(row[8]),type(row[9]),type(row[10]),type(row[1
51 csv_reader = ader(open('test.csv'))
ute(creattable)
ute('INSERT INTO test1 VALUES', data,types_check=True)
54 ute('select count(1) from test1'))
55
56
57except Exception as e:
58 print(e)
——————————————————————————————————————————————————
下⾯是python读取Mysql的表数据存为csv,再插⼊到clickhouse:
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from clickhouse_driver import Client
4import types
5import re
6import time,datetime
7
8from datetime import date
9import pymysql
10import warnings
11import csv
12
13
14warnings.filterwarnings('ignore')
15pos1 = t(host='192.168.1.235',port=3306,user='root',password='123456',db='0001790455_pos',charset="utf8") 16pos = pos1.cursor()
17
18
19try:
ute("select *from bigtable limit 1000")
21 readcsv=pos.fetchall()
22 readcsv=list(readcsv)
23 with open('D:\python\csvtest.csv','w',newline='') as csvfile:
24 writer = csv.writer(csvfile)
25 writer.writerows(readcsv)
26except Exception as e:
27 print(e)
28
29
30
31client = Client(host='192.168.1.231',database='test6',user='default',password='')
32creattable="""CREATE TABLE test1 (\
33 consumption_id UInt64,\
34 member_id UInt64,\
35 fans_id UInt64,\
36 bill_date Date,\
37 money Float32,\
38 people_num UInt8,\
39 dish_name String,\
40 created_org UInt8,\
41 open_id String,\
42 subscribed_time DateTime,\
43 unsubscribed_time DateTime,\
44 source_type UInt8,\
45 sns_type UInt8,\
46 is_subscribed UInt8\
47 )ENGINE=MergeTree(bill_date,(consumption_id,created_org),8192)"""
49data=[]
50start = time.time()
51
52with open(r'D:\python\csvtest.csv') as csvfile:
53 readCSV = ader(csvfile, delimiter=',')
54 for row in readCSV:
55 row[0]=int(row[0])
56 row[1]=int(row[1])
57 row[2]=int(row[2])
58 x=row[3].split('-')
59 row[3]=date(int(x[0]),int(x[1]),int(x[2]))
60 x=re.findall(r"\d+\.?\d*",row[10])
61 row[10]=datetime.datetime(int(x[0]),int(x[1]),int(x[2]),int(x[3]),int(x[4]),int(x[5]))
62 x=re.findall(r"\d+\.?\d*",row[9])
63 row[9]=datetime.datetime(int(x[0]),int(x[1]),int(x[2]),int(x[3]),int(x[4]),int(x[5]))
63 row[9]=datetime.datetime(int(x[0]),int(x[1]),int(x[2]),int(x[3]),int(x[4]),int(x[5]))
64 row[4]=float(row[4])
65 row[5]=int(row[5])
66 row[7]=int(row[7])
67 row[11]=int(row[11])
68 row[12]=int(row[12])
69 row[13]=int(row[13])
70 data.append(row)
71
72
73try:
ute(creattable)
ute('INSERT INTO test1 VALUES', data,types_check=True)
76 end = time.time()
77 print('clickhouse插⼊时间',end-start)
78
79
80except Exception as e:
81 print(e)
同样的,要注意csv数据读取到Python时的格式转换问题
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论