FlinkSQLkafka流式写⼊hive
public class Kafka2Hive {
public static void main(String[] args){
StreamExecutionEnvironment environment = ExecutionEnvironment(); EnvironmentSettings settings = wInstance().useBlinkPlanner().build();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = ate(environment, settings);
String name ="myhive";
String defaultDatabase ="mydatabase";
String hiveConfDir ="F:\\flink-demo\\src\\main\\resources";
HiveCatalog hive =new HiveCatalog(name, defaultDatabase, hiveConfDir);
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
// hive ⽅⾔
// 已创建kafka catalog 以下建表语句不要执⾏
// String hive_sql = "CREATE TABLE hive_table (" +
// " user_id STRING," +
// " order_amount DOUBLE" +
// ") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (" +
// " 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'," +
// " 'igger'='partition-time'," +
// " 'sink.partition-commit.delay'='1 h'," +
// " 'sink.partition-commit.policy.kind'='metastore,success-file'" +
// ")";
//
// uteSql(hive_sql);
// default ⽅⾔
// // 已创建kafka catalog 以下建表语句不要执⾏thrift
// String kafka_sql = "CREATE TABLE kafka_table (" +
/
/ " user_id STRING," +
// " order_amount DOUBLE," +
// " log_ts BIGINT," +
// " ts AS TO_TIMESTAMP(FROM_UNIXTIME(log_ts / 1000, 'yyyy-MM-dd HH:mm:ss'))," +
// " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
// ") WITH (" +
// " 'connector' = 'kafka'," +
// " 'topic' = 'user_behavior'," +
// " 'properties.bootstrap.servers' = 'hadoop01:9092'," +
// " 'up.id' = 'testGroup'," +
// " 'de' = 'earliest-offset'," +
/
/ " 'format' = 'csv')";
//
// uteSql(kafka_sql);
"SELECT user_id, order_amount, DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH')"+ "FROM kafka_table");
}
}
<property>
<name&astore.uris</name> <value>thrift://hadoop01:9083</value> </property>
启动 hive --service metastore 测试数据
1,10,1625886660000
2,3,1625886721000
1,5,1625887380000
1,6,1625887800000
2,30,1625886721000
1,4,1625889989000
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论