FlinkSQL读写MySQL
FlinkSQL读取MySQL⼤多⽤作维表关联, 聚合结果写⼊MySQL,简单记录⼀下⽤法。
package com.sm.job
import com.sm.utils.FlinkUtils
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.log4j.Level
import org.slf4j.LoggerFactory
/**
* flinkSQL 读写 mysql
*
* create by LiuJinHe 2020/10/22
*/
object FlinkJdbcConnector {
private var logger: org.slf4j.Logger = _
def main(args: Array[String]): Unit = {
logger = SimpleName)
org.apache.Logger("org.apache.hadoop").setLevel(Level.WARN)
org.apache.Logger("org.apache").setLevel(Level.INFO)
org.apache.Logger("io.debezium").setLevel(Level.INFO)
// 初始化 stream 环境
// 本地测试,需要 flink-runtime-web 依赖
val env = ateLocalEnvironmentWithWebUI()
// val env = ExecutionEnvironment
// 失败重启,固定间隔,每隔3秒重启1次,总尝试重启10次
getvalue什么意思// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 3))
// 本地测试线程 1
env.setParallelism(1)
// 事件处理的时间,由系统时间决定
// env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// 创建 streamTable 环境
val tableEnv: StreamTableEnvironment = ate(env, Settings)
// checkpoint 设置
val tableConfig = Configuration
// 开启checkpoint
tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
// checkpoint的超时时间周期,1 分钟做⼀次checkpoint, 每次checkpoint 完成后 sink 才会执⾏
tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(60))
// checkpoint的超时时间, 检查点⼀分钟内没有完成将被丢弃
tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofSeconds(60))
// checkpoint 最⼩间隔,两个检查点之间⾄少间隔 30 秒
tableConfig.set(ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS, Duration.ofSeconds(30))
// 同⼀时间只允许进⾏⼀个检查点
tableConfig.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, Integer.valueOf(1))
// ⼿动cancel时是否保留checkpoint
tableConfig.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT,
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// ⼿动cancel时是否保留checkpoint
// Catalog
// val memoryCatalog = new GenericInMemoryCatalog("kafkaSourceTable", "memory")
/**
* mysql 源表
*/
val mysqlSourceSql =
"""
|create table mysqlSourceTable (
0 c语言| ID bigint,
| NAME string,
| DEVELOPER_ID bigint,
| DEVELOPER_SHARE decimal(11,2),
| STATE tinyint,
| WEB_OPEN tinyint,
| IS_MULTIPLE tinyint,
| CP_GAME_NAME string,
| SM_GAME_NAME string,
| REMARK string,
| CP_GAME_ID int,
| UPDATE_TIME TIMESTAMP
|) with (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://192.168.100.39:3306/game_platform?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDa | 'username' = 'root',
| 'password' = '123456',
log2为底| 'table-name' = 'games',
| 'driver' = 'sql.cj.jdbc.Driver',
| 'scan.fetch-size' = '200'
|)
""".stripMargin
/**
* mysql sink
*/
val printSinkSql =
"""
|create table printSinkTable (
mysql怎么读英语| ID bigint,
| NAME string,
| DEVELOPER_ID bigint,
| DEVELOPER_SHARE decimal(11,2),
| STATE tinyint,
| WEB_OPEN tinyint,
| IS_MULTIPLE tinyint,
| CP_GAME_NAME string,
| SM_GAME_NAME string,
| REMARK string,
| CP_GAME_ID int,
| UPDATE_TIME TIMESTAMP
|) with (
| 'connector' = 'print'
|)
""".stripMargin
// val printSinkSql =
// """
// |create table printSinkTable (
// | ID bigint,
// | NAME string,
excel vb编程入门// | DEVELOPER_ID bigint,
/
/ | DEVELOPER_SHARE decimal(11,2),
// | STATE tinyint,
// | WEB_OPEN tinyint,
// | WEB_OPEN tinyint,
// | IS_MULTIPLE tinyint,
// | CP_GAME_NAME string,
// | SM_GAME_NAME string,
// | REMARK string,
// | CP_GAME_ID int,ant design vue admin
// | UPDATE_TIME TIMESTAMP
// |) with (
/
/ | 'connector' = 'jdbc',
// | 'url' = 'jdbc:mysql://localhost:3306/test_flink?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeB // | 'username' = 'root',
// | 'password' = '863863',
// | 'table-name' = 'test-flink',
// | 'driver' = 'sql.cj.jdbc.Driver',
// | 'sink.buffer-flush.interval' = '3s',
// | 'sink.buffer-flush.max-rows' = '1',
// | 'sink.max-retries' = '5'
// |)
val insertSql =
"""
|insert into printSinkTable
|select * from mysqlSourceTable
""".stripMargin
}
}
依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
<!-- <scope>provided</scope>-->
</dependency>
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论