FlinkSQLkafka流式写⼊hive
public class Kafka2Hive {
public static void main(String[] args){
StreamExecutionEnvironment environment = ExecutionEnvironment();        EnvironmentSettings settings = wInstance().useBlinkPlanner().build();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv = ate(environment, settings);
String name            ="myhive";
String defaultDatabase ="mydatabase";
String hiveConfDir    ="F:\\flink-demo\\src\\main\\resources";
HiveCatalog hive =new HiveCatalog(name, defaultDatabase, hiveConfDir);
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
// hive ⽅⾔
// 已创建kafka catalog 以下建表语句不要执⾏
//        String hive_sql = "CREATE TABLE hive_table (" +
//                "  user_id STRING," +
//                "  order_amount DOUBLE" +
//                ") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (" +
//                "  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'," +
//                "  'igger'='partition-time'," +
//                "  'sink.partition-commit.delay'='1 h'," +
//                "  'sink.partition-commit.policy.kind'='metastore,success-file'" +
//                ")";
//
//        uteSql(hive_sql);
// default ⽅⾔
//        // 已创建kafka catalog 以下建表语句不要执⾏thrift
//        String kafka_sql = "CREATE TABLE kafka_table (" +
/
/                "  user_id STRING," +
//                "  order_amount DOUBLE," +
//                "  log_ts BIGINT," +
//                "  ts AS TO_TIMESTAMP(FROM_UNIXTIME(log_ts / 1000, 'yyyy-MM-dd HH:mm:ss'))," +
//                "  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
//                ") WITH (" +
//                "  'connector' = 'kafka'," +
//                "  'topic' = 'user_behavior'," +
//                "  'properties.bootstrap.servers' = 'hadoop01:9092'," +
//                "  'up.id' = 'testGroup'," +
//                "  'de' = 'earliest-offset'," +
/
/                "  'format' = 'csv')";
//
//        uteSql(kafka_sql);
"SELECT user_id, order_amount, DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH')"+ "FROM kafka_table");
}
}
<property>
<name&astore.uris</name> <value>thrift://hadoop01:9083</value> </property>
启动 hive --service metastore 测试数据
1,10,1625886660000
2,3,1625886721000
1,5,1625887380000
1,6,1625887800000
2,30,1625886721000
1,4,1625889989000

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。