pyflink连接mysql_1.11.0pyflink使⽤例⼦python版本要求3.5及以上
安装pyflinkpython -m pip install apache-flink
如果安装下载太慢会time-out , 换pip源
代码
table_api⽅式
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
#连接器在这descriptors⾥⾯,可以在这⾥⾯看需要什么参数
from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, Json
from pyflink.table.udf import udf
env = _execution_environment()
env.set_parallelism(1)
t_env = ate(env)
#编写注册udf,暂时先不⽤
#_config().get_configuration().set_string("managed", 'true')
#add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
#ister_function("add", add)
#读kafka
properties = {
"t" : "172.17.0.2:2181",
"bootstrap.servers" : "172.17.0.2:9092",
mysql下载链接"group.id" : "flink-test-cxy"
}
t(Kafka().properties(properties).version("universal").topic("mytesttopic").start_from_latest()) \
.with_format(Json()).with_schema(Schema() \
.field('a', DataTypes.BIGINT()) \
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')
#读csv
# t(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_')) \
# .with_format(OldCsv()
# .field('a', DataTypes.BIGINT())
# .field('b', DataTypes.BIGINT())) \
# .with_schema(Schema()
# .field('a', DataTypes.BIGINT())
# .field('b', DataTypes.BIGINT())) \
# .create_temporary_table('mySource')
#写⼊csv
t(FileSystem().path('C:/Users/xuyin/Desktop/pyflink_')) \ .with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
#读取kafka数据中的a和b字段相加再乘以2 , 并插⼊sink
t_env.from_path('mySource')\
.select("(a+b)") \
.insert_into('mySink')
ute("job_test")
调试
打开kafka producer ,输⼊数据
结果
<接收到的数据
使⽤table_sql
⽐如在创建source时,使⽤sql_update
t_env.sql_update("""
CREATE TABLE mySource (
a bigint,
b bigint
) WITH (
'pe' = 'kafka',
'connector.version' = 'universal',
'pic' = 'mytesttopic',
't' = '172.17.0.2:2181',
'connector.properties.bootstrap.servers' = '172.17.0.2:9092',
'up.id' = 'flink-test-cxy',
'connector.startup-mode' = 'latest-offset',
'pe' = 'json'
)
""")
使⽤udf
创建和注册
#编写注册udf
_config().get_configuration().set_string("managed", 'true') add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
ister_function("add", add)
使⽤
t_env.sql_update("insert into mySink select add(a,b) from mySource")
另外例⼦
kafka2mysql
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem, Kafka, Json
from pyflink.table.udf import udf
env = _execution_environment()
env.set_parallelism(1)
t_env = ate(env)
t_env.sql_update("""
CREATE TABLE mySource (
a bigint,
b bigint
) WITH (
'pe' = 'kafka',
'connector.version' = 'universal',
'pic' = 'mytesttopic',
't' = '172.17.0.2:2181',
'connector.properties.bootstrap.servers' = '172.17.0.2:9092',
'up.id' = 'flink-test-cxy',
'connector.startup-mode' = 'latest-offset',
'pe' = 'json'
)
""")
t_env.sql_update("""
CREATE TABLE mysqlsink (
id bigint,
game_id varchar
)
with (
'pe' = 'jdbc',
'connector.url' = 'jdbc:mysql://:3306/flinksql?useSSL=false' ,
'connector.username' = 'x' ,
'connector.password' = 'x',
'connector.table' = 'mysqlsink' ,
'connector.driver' = 'sql.cj.jdbc.Driver' ,
'connector.write.flush.interval' = '5s',
'connector.write.flush.max-rows' = '1'
)
""")
t_env.sql_update("insert into mysqlsink select a , cast(b as varchar) b from mySource") ute("job")

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。