javaFlink使⽤addSink⽅法保存流到mysql数据库中博主把核⼼的内容写在最前⾯,其他内容和完整的代码放在最后⾯哈:
⽂章⽬录
pom配置
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version>
</dependency>
主要代码
package;
import MyData2;// 格式见其他内容
import MyDataSource2;// 格式见其他内容
import DataStreamSource;
import StreamExecutionEnvironment;
public class GetMain {
public static void main(String[] args)throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = ExecutionEnvironment();
DataStreamSource<MyData2> sourceStream = env.addSource(new MyDataSource2());// 得到数据源
sourceStream.addSink(new MysqlSink());// 核⼼!保存到mysql
}
}
可以看到,使⽤流.addSink()就可以保存流的数据了,这个MysqlSink是⾃⼰写的保存逻辑,代码如下:
import MyData2;
import Configuration;
import RichSinkFunction;
import Connection;
import DriverManager;
import PreparedStatement;
public class MysqlSink extends RichSinkFunction<MyData2>{
private PreparedStatement ps =null;
private Connection connection =null;
String driver ="sql.jdbc.Driver";
String url ="jdbc:mysql://127.0.0.1:3306/my_test_db?useSSL=false";
String username ="testuser";
String password ="testpassword";
@Override
public void open(Configuration parameters)throws Exception {// 要执⾏的代码
super.open(parameters);// ⽤于建⽴连接
Class.forName(driver);//加载JDBC驱动
connection = Connection(url, username, password);
String sql ="insert into st_csv (col_1,col_2,col_3,col_4)"+
"values (?,?,?,'what');";
ps = connection.prepareStatement(sql);
}
@Override
public void invoke(MyData2 value, Context context)throws Exception {// 真正执⾏的操作
// 每次插⼊都会调⽤⼀次
// 这⾥的根据具体的操作逻辑来
// ps.setxxx(n,xxx) 这⾥的n代表要保存的位置,也就是要把数据拍到上⾯的String sql的第⼏个?(问号)上
ps.setString(1, String.valueOf(value.keyId));
ps.setString(2, String.valueOf(value.timestamp));
ps.setString(3, String.valueOf(value.num));
}
@Override
public void close()throws Exception {// 关闭操作
super.close();
if(connection !=null){
connection.close();
}
if(ps !=null){
ps.close();
}
}
}
只需要覆写open,invoke,close三个函数即可,⼀个⽤于打开连接,⼀个⽤于执⾏操作,⼀个⽤于关闭连接。其他内容:MyData2类,与⽣成数据源的类MyDataSource2
MyData2.java
import Arrays;
public class MyData2 {
public int keyId;
public long timestamp;
public int num;
public double[] valueList;
public MyData2(){
}
public MyData2(int accountId,long timestamp,int num,double[] valueList){ this.keyId = accountId;
this.timestamp = timestamp;
this.num = num;
this.valueList = valueList;
}
public long getKeyId(){
return keyId;
}
public void setKeyId(int keyId){
this.keyId = keyId;
}
public long getTimestamp(){
return timestamp;
}
public void setTimestamp(long timestamp){
this.timestamp = timestamp;
}
public double[]getValueList(){
return valueList;
}
public void setValueList(double[] valueList){
this.valueList = valueList;
}
public int getNum(){
return num;
}
public void setNum(int num){
this.num = num;
}
@Override
public String toString(){
return"MyData{"+
"keyId="+ keyId +
", timestamp="+ timestamp +
", num="+ num +
", valueList= "+ String(valueList)+
'}';
}
}
MyDataSource2.java
import SourceFunction;
import Random;
public class MyDataSource2 implements SourceFunction<MyData2>{
// 定义标志位,⽤来控制数据的产⽣
private boolean isRunning =true;
private final Random random =new Random(0);
@Override
public void run(SourceContext ctx)throws Exception {
while(isRunning){
}
}
@Override
public void cancel(){
isRunning =false;
mysql下载add produce
}
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。