python连接mysql_Flink使⽤python连接mysql Flink 使⽤python连接mysql
mysql连接配置依赖包
1.下载flink-connector-jdbc_
2.11 jar包 ⽹络路径如下
2.将下载jar包放到/flink-1.11.2/lib下
flink-connector-jdbc_2.11:flink-connector-jdbc_2.11-1.11.2.jar
msql的驱动jar包: mysql-connector-java-5.1.47.jar
3.将下载jar包放到/python/site-packages/pyflink/lib下
cp flink-connector-jdbc_2.11-1.11.2.jar /home/hadoop/.local/share/virtualenvs/pycharm_project_305-
MIKSrtht/lib/python3.7/site-packages/pyflink/lib
cp mysql-connector-java-5.1.47.jar /home/hadoop/.local/share/virtualenvs/pycharm_project_305-
mysql下载配置
MIKSrtht/lib/python3.7/site-packages/pyflink/lib
我们来看个flink连接mysql 的例⼦
1.准备数据
CREATE TABLE `my_mysql_Sink` (
`word` varchar(50) NOT NULL,
`count` bigint(20) DEFAULT NULL,
`ts` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`word`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO `my_mysql_Sink` VALUES ('ceshi',30,'2020-11-02 01:43:33'),('hed',50,'2020-11-02 01:43:33'), ('hellp',30,'2020-11-02 01:43:33');
2.pyflink脚本如下
from pyflink.table import TableConfig,DataTypes,BatchTableEnvironment,TableEnvironment,EnvironmentSettings
env_w_instance().in_batch_mode().use_blink_planner().build()
t_ate( environment_settings=env_seting )
mysql_sink_ddl = """create table sink (`word` VARCHAR,`count` BIGINT,ts timestamp
) with (
'pe' = 'jdbc',
'connector.url' = 'jdbc:mysql://10.106.216.72:3306/test',
'connector.table' = 'my_mysql_Sink',
'connector.username' = 'goodhope',
'connector.password' = '123456',
'connector.write.flush.interval' = '1s')"""
ute_sql(mysql_sink_ddl)
ute_sql("select * from sink").print() #ute("python_job")
3.运⾏结果如下:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论