FlinkSQL实践--时态表版本表
1. 背景
在FlinkSQL关联时,必然会涉及到维表,维表⼜可能是不断变化的(aka 时态表或版本表)。
版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来⾃数据库的 changelog 可以定义成版本表。普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来⾃数据库或 HBase 的表可以定义成普通表。
2. FlinkSQL中时态表类型
2.1 Debezium
2.2 a compacted Kafka topic
bin/kafka-topics.sh --alter --topic my_topic_name --zookeeper my_zookeeper:2181 --config cleanup.policy=compact
"CREATE TABLE dim_source (" +
" id STRING," +
" name STRING," +
" update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, " +
kafka最新版本" WATERMARK FOR update_time AS update_time, " +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'upsert-kafka'," +
" 'topic' = 'flinksqldim'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'up.id' = 'flinksqlDim'," +
" 'key.format' = 'json'," +
" 'value.format' = 'json')"
);
对应的kafka topic⽣产者代码
// 创建消息
// DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:nnn");
for (int i = 1; i < 5; i++) {
JSONObject json1 = new JSONObject();
json1.put("key", i+"");
//json.put("update_time", dtf.w()));
JSONObject json = new JSONObject();
json.put("id", i+"");
json.put("name", "name222"+i);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"flinksqldim",
);
// 发送消息
Future<RecordMetadata> future = producer.send(record);
}
3. 遇到的问题
3.1 kafka⽀持的METADATA
3.2 key.format
必须指定key.format,可以不指定'de' = 'earliest-offset'
4. 引⽤
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论