Flink实战(七⼗九):flink-sql使⽤(七)流式SQL应⽤(实
时使⽤)-flin。。。
 随着Flink 1.10的发布,对SQL的⽀持也⾮常强⼤。Flink 还提供了 MySql, Hive,ES, Kafka等连接器Connector,所以使⽤起来⾮常
⽅便。
  接下来咱们针对构建流式SQL应⽤⽂章的梗概如下:
  1. 搭建流式SQL应⽤所需要的环境准备。
  2. 构建⼀个按每⼩时进⾏统计购买量的应⽤。
  3. 构建每天以10分钟的粒度进⾏统计应⽤。
  4. 构建按分类进⾏排⾏,取出想要的结果应⽤。
1. 搭建流式应⽤所需要的环境准备
注意:elasticsearch,mysql要配置允许远程访问
   Kafka,⽤于将数据写⼊到Kafka中,然后Flink通过读取Kafka的数据然后再进⾏处理。版本号:2.11。
    MySQL, ⽤于保存数据的分类。Flink从中读取分类进⾏处理和计算 。版本号:8.0.15。
   ElasticSearch, ⽤于保存结果数据和进⾏索引存储。下载的时候可以在搜索引擎⾥边搜索“elasticsearch 国内”,这样就可以从
国内快速下载,要不然下载的太慢了。版本号:7.6.0。
   Kibana, ⽤于ES的结果展⽰,图形化的界⾯美观。 下载的时候也需要搜索“Kibana 国内”,⽐较快速。版本号:7.6.0。
    Flink, 核⼼的流处理程序,版本号:1.10。Flink⽀持国内镜像下载,这个到时候可以⾃⾏⼀下。
    Zookeeper,  Kafka依赖这个应⽤,所以也会⽤到的,这个什么版本都是可以的。我的版本号:3.4.12。
   当然我的是mac电脑,如果是mac电脑的话,下载ES和Kibana的时候要下载⽂件中带“darwin”字样的,可以在Mac中使⽤其他的
不能执⾏。应该是程序⾥边的编译不同,这个也是⼀个⼩坑。
    因为Flink需要连接Mysql, Elasticseratch , Kafka,所以也需要提前下载Flink所需要的Connector jar包到Flink的lib⾥边。
wget -P ./lib/ /maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \
wget -P ./lib/ /maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \
wget -P ./lib/ /maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.0/flink-sql-connector-elasticsearch6_2.11-1.10.0.ja    wget -P ./lib/ /maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \
wget -P ./lib/ /maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
  环境都准备好了,那么需要把环境都启动起来,进⾏检查。
  Zookeeper 这个相信很多同学都会配置了,如果有不会配置的,可以⾃⼰搜索⼀下。
  我们先看⼀下最后的效果图,可能不是特别好,是这么个意思。
mysql社区版国内镜像下载
2. 构建⼀个按每个⼩时统计购买量应⽤。 
  我们写⼀个程序,往Kafka⾥边写数据,模拟⼀些连续的数据源头。
  ⾸先定义⼀个Pojo类。
public class UserBehavior {
//⽤户ID
public long userId;
//商品ID
public long itemId;
//商品类⽬ID
public  int categoryId;
//⽤户⾏为,包括{"pv","buy","cart", "fav"}
public String behavior;
/
/⾏为发⽣的时间戳,单位秒
public String ts;
public long getUserId() {
return userId;
}
public void setUserId(long userId) {
this.userId = userId;
}
public long getItemId() {
return itemId;
}
public void setItemId(long itemId) {
this.itemId = itemId;
}
public int getCategoryId() {
return categoryId;
}
public void setCategoryId(int categoryId) {
this.categoryId = categoryId;
}
public String getBehavior() {
return behavior;
}
public void setBehavior(String behavior) {
this.behavior = behavior;
}
public String getTimestamp() {
return ts;
}
public void setTimestamp(String ts) {
this.ts = ts;
}
}
  接着写⼀个往Kafka写数据的类。随机⽣成⽤于的⾏为,⾥边包括⽤户的id,类⽬id等。让程序运⾏起来。
import com.alibaba.fastjson.JSON;
import myflink.pojo.UserBehavior;
import org.apachemons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import urrent.TimeUnit;
/**
* @author huangqingshi
* @Date 2020-03-15
*/
public class KafkaWriter {
//本地的kafka机器列表
public static final String BROKER_LIST = "localhost:9092";
//kafka的topic
public static final String TOPIC_USER_BEHAVIOR = "user_behaviors";
//key序列化的⽅式,采⽤字符串的形式
public static final String KEY_SERIALIZER = "org.apache.kafkamon.serialization.StringSerializer";
//value的序列化的⽅式
public static final String VALUE_SERIALIZER = "org.apache.kafkamon.serialization.StringSerializer";
private static final String[] BEHAVIORS = {"pv","buy","cart", "fav"};
private static KafkaProducer<String, String> producer;
public static void writeToKafka() throws Exception{
//构建userBehavior, 数据都是随机产⽣的
int randomInt = Int(0, 4);
UserBehavior userBehavior = new UserBehavior();
userBehavior.setBehavior(BEHAVIORS[randomInt]);
Long ranUserId = Long(1, 10000);
userBehavior.setUserId(ranUserId);
int ranCate = Int(1, 100);
userBehavior.setCategoryId(ranCate);
Long ranItemId = Long(1, 100000);
userBehavior.setItemId(ranItemId);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
userBehavior.setTimestamp(sdf.format(new Date()));
//转换为json
String userBehaviorStr = JSONString(userBehavior);
//包装成kafka发送的记录
ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_USER_BEHAVIOR, null,                null, userBehaviorStr);
//发送到缓存
producer.send(record);
System.out.println("向kafka发送数据:" + userBehaviorStr);
//⽴即发送
producer.flush();
}
public static void main(String[] args) {
Properties props = new Properties();
Properties props = new Properties();
props.put("bootstrap.servers", BROKER_LIST);
props.put("key.serializer", KEY_SERIALIZER);
props.put("value.serializer", VALUE_SERIALIZER);
producer = new KafkaProducer<>(props);
while(true) {
try {
//每⼀秒写⼀条数据
TimeUnit.SECONDS.sleep(1);
writeToKafka();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
  本地idea Console 输出的结果是这样的:
    向kafka发送数据:{"behavior":"buy","categoryId":7,"itemId":75902,"timestamp":"2020-03-15T11:35:11Z","ts":"2020-03-
15T11:35:11Z","userId":4737}
  我们将Flink的任务数调整成10个,也就是同时执⾏的任务数。 位置在 conf/flink-conf.yaml,taskmanager.numberOfTaskSlots: 10,然后重启下。我的已经启动并且运⾏了3个任务,看下图:
  我们接下来运⾏Flink 内置的客户端。命令: bin/sql-client.sh embedded,这样我们就开始了Flink SQL之旅了。我们使⽤Flink的DDL,从Kafka⾥边读取数据,采⽤ProcessingTime的时间事件进⾏处理,为ts设置⽔位线,允许5秒延迟。更多参考  和 。⾥边的Kafka 连接以及相关的配置,相信⼤家都不是很陌⽣。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。