关于通过flink接⼊带ssl验证的kafka数据的相关问题总结。
场景描述:之前是做kafka不是通过ssl验证的⽅式进⾏接⼊的,所以就是正常的接受数据。发现我们通过aws服务器去访问阿⾥云服务器上的kafka的时候,我们服务器要把全部的⽹关
开放给阿⾥云服务器的kafka这样的话数据就很不安全。所以就从阿⾥买了kafka服务器这样就能通过公⽹去访问服务器,然后带验证的kafka 集。
下⾯是flink连接kafka不⽤验证的代码:
不⽤验证的kafka,flink 读取数据
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = ExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "IP:9093");
properties.setProperty("group.id", "tests-consumer-ssgroup20200922");
properties.setProperty("set", "latest");
System.out.println("11111111111");
FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010("binlog", new SimpleStringSchema(), properties);
DataStream<String> keyedStream = env.addSource(myConsumer).setParallelism(1);
System.out.println("2222222222222");
System.out.println("3333333333333");
keyedStream.addSink(new MysqlSink()).setParallelism(1).name("数据插⼊mysql").disableChaining();
}
需要验证的kafka,flink读取数据
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = ExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props = new Properties();
//设置接⼊点,请通过控制台获取对应Topic的接⼊点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9093");
//设置SSL根证书的路径,请记得将XXX修改为⾃⼰的路径
//与sasl路径类似,该⽂件也不能被打包到jar中
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/home/hadoop/uststore.jks");
//根证书store的密码,保持不变
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
//接⼊协议,⽬前⽀持使⽤SASL_SSL协议接⼊
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
//SASL鉴权⽅式,保持不变
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//两次poll之间的最⼤允许间隔
//可更加实际拉去数据和客户的版本等设置此值,默认30s
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//设置单次拉取的量,⾛公⽹访问时,该参数会有较⼤影响
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
//每次poll的最⼤数量
//注意该值不要改得太⼤,如果poll太多数据,⽽不能在下次poll之前消费完,则会触发⼀次负载均衡,产⽣卡顿
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//消息的反序列化⽅式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafkamon.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafkamon.serialization.StringDeserializer");
//当前消费实例所属的消费组,请在控制台申请之后填写
//属于同⼀个组的消费实例,会负载消费消息
props.put(ConsumerConfig.GROUP_ID_CONFIG, "bi-binlog");
//hostname校验改成空
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
//props.put("set", "earliest");
kafka命令props.put("fig",
"org.apache.kafkamon.security.scram.ScramLoginModule required username='username' password='password';"); //这⾥的分号⼀定要注意
FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010("binlog", new SimpleStringSchema(), props);
DataStream<String> keyedStream = env.addSource(myConsumer).setParallelism(1);
System.out.println("2222222222222");
System.out.println("3333333333333");
// System.out.println(keyedStream.print());
keyedStream.addSink(new MysqlSink()).setParallelism(1).name("数据插⼊mysql").disableChaining();
// keyedStream.addSink(new S3Sink()).setParallelism(2).name("数据插⼊s3");
}
因为kafka是要经过ssl验证的,因为我们这⾥是使⽤的阿⾥的kafka所以他有⼀个验证⽂件uststore.jks,这个⽂件是kafka的验证⽂件。必须放在服务器的本地。他不能通过读取hdfs上的⽂件。
所以在这⾥⼀定要注意将这个⽂件放在你flink 集的各个节点上⾯都放⼀份。要不然他就会报下⾯的错误。
flink的相关环境bubu部署好之后就将代码打包提交到服务器上。提交的命令如下:
flink run -m yarn-cluster -yjm 1024 -ytm 1024 -c WordCount /mnt/flinkjar/mysqlflink.jar
我们的代码是提交到yarn上⾯去的,这⾥给jobmanager和taskmanager各⾃分配了⼀个G的内存。
然后我将代码提交之后就开始报错报错的内容如下:
2020-09-2711:35:40,927 INFO org.apache.point.ClusterEntrypoint
- Shutting YarnJobClusterEntrypoint down with application status FAILED.
Diagnostics java.lang.NoSuchMethodError:
org.figuration.ConfigOptions$OptionBuilder.stringType()Lorg/apache/flink/configuration/ConfigOptions$TypedConfigOptionBuilder;
at org.apache.figuration.YarnConfigOptions.<clinit>(YarnConfigOptions.java:214)
at org.apache.RPCPortRange(YarnJobClusterEntrypoint.java:63)
at org.apache.point.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:246)
at org.apache.point.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)
at org.apache.point.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.point.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
at org.apache.point.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)
at org.apache.point.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:119)
.
2020-09-2711:35:40,937 ERROR org.apache.point.ClusterEntrypoint - Could not start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.point.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint.
at org.apache.point.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:182)
at org.apache.point.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)
at org.apache.point.YarnJobClusterEntrypoint.main(Y
看到这⾥的错误搜索了半天也没有。也没到答案。最后发现我们新做的emr集当中的flink的版本是1.10.0但是原来那个emr集当中的版本是1.9.0。然后去pom⽂件将版本修改正确
然后执⾏⼀下代码发现可以了,⾄此问题得到解决了。
从这个错误当中可以看出遇到 NoSuchMethodError ⼀般都是出现了组件的版本和代码的版本不同造成的。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论