第六章kafka专题之SpringBoot整合KAFKA之⽣产者代码实战案例1、Java实现KafkaProducer
import KafkaProducer;
import ProducerRecord;
import Properties;
public class SimpleKafkaProducer {
private static KafkaProducer<String, String> producer;
private final static String TOPIC ="adienTest2";
public SimpleKafkaProducer(){
Properties props =new Properties();
//服务器IP
props.put("bootstrap.servers","localhost:9092");
props.put("acks","all");
props.put("retries",0);
props.put("batch.size",16384);
props.put("linger.ms",1);
props.put("",33554432);
//序列化器,序列化成字节数组byte[]
props.put("key.serializer","org.apache.kafkamon.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafkamon.serialization.StringSerializer");
//设置分区类,根据key进⾏数据分区
producer =new KafkaProducer<String, String>(props);
}
public void produce(){
for(int i =30;i<40;i++){
String key = String.valueOf(i);
String data ="hello kafka message:"+key;
producer.send(new ProducerRecord<String, String>(TOPIC,key,data));
System.out.println(data);
}
producer.close();
}
public static void main(String[] args){
new SimpleKafkaProducer().produce();
}
}
2、⽣产者到Broker发送流程
①kafkaProcuder发送的消息先进⼊客户端本地内存缓冲区(默认是16kb)
②之后把很多消息收集到Bath⾥
③最后⼀次性发送到Broker上
//设置发送消息本地缓冲⼤⼩,消息会优先发送到本地缓冲区
props.put("",33554432);
//设置批量发送消息的⼤⼩,如果⼀个batch满了,即达到16k就会发送出去
props.put("batch.size",16384);
//设置消息延迟发送时间,
/**
①如果在1毫秒内,这个batch满了就会随batch⼀起发送出去
②如果1ms到了还没满16k也必须发送出去
*/
props.put("linger.ms",1);
3、同步发送&异步发送
消息发送主要涉及两个线程:Main⽤户主线程,Sender线程
Main线程:发送消息到消息内存缓冲区后⽴即返回
sender线程:从消息内存缓冲区拉取数据到broker
3.1、同步发送
原理:⽣产者发送消息后没有收到ack,⽣产者会阻塞3s,之后重试发送3次返回:发送消息后返回的是⼀个Future对象,调⽤get进⾏阻塞
(1)编写脚本 – 同步发送
package;
import FuncTrue;
import KafkaProducer;
springboot推荐算法import Producer;
import ProducerRecord;
import RecordMetadata;
import Test;
import Properties;
import ExecutionException;
import Future;
public class MyProducer {
private static final String TOPIC_NAME ="first";
// 1、封装配置属性
public static Properties getProperties(){
Properties props =new Properties();
props.put("bootstrap.servers","192.168.6.102:9092");
props.put("acks","all");
props.put("retries",0);
props.put("linger.ms","1");
props.put("batch.size",16384);
props.put("",3554432);
props.put("key.serializer","org.apache.kafkamon.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafkamon.serialization.StringSerializer");
return props;
}
// 2、⽣产者同步发送
@Test
public void testSend(){
Properties properties =getProperties();
// 传递参数
Producer<String,String> producer =new KafkaProducer<String, String>(properties);
// 发送消息
for(int i=0;i<3;i++){
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME,"key","value"+i));
try{
//使⽤get⽅法进⾏阻塞
RecordMetadata recordMetadata = ();
//打印发送内容:topi-partition@offset
System.out.println("发送装:"+String());
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}
}
// 关闭⽣产者
producer.close();
}
}
(2)运⾏结果
3.2、异步发送
原理:⽣产者发送完消息后就可以执⾏之后的任务,broker在收到消息后异步调⽤⽣产者提供的callback⽅法返回:配置回调函数,在Prducer收到ack时被调⽤
(1)编写脚本 – 异步发送
回调函数的两个参数:RecordMetadata和Exception,如果Exception是null,则消息发送成功,否则发送失败package;
import FuncTrue;
import*;
import Test;
import Properties;
import ExecutionException;
import Future;
public class MyProducer {
private static final String TOPIC_NAME ="first";
// 1、封装配置属性
public static Properties getProperties(){
Properties props =new Properties();
props.put("bootstrap.servers","192.168.6.102:9092");
props.put("acks","all");
props.put("retries",0);
props.put("linger.ms","1");
props.put("batch.size",16384);
props.put("",3554432);
props.put("key.serializer","org.apache.kafkamon.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafkamon.serialization.StringSerializer");
return props;
}
// 2、⽣产者同步发送
@Test
public void testSend(){
Properties properties =getProperties();
// 传递参数
Producer<String,String> producer =new KafkaProducer<String, String>(properties);
// 发送消息
for(int i=0;i<3;i++){
producer.send(new ProducerRecord<>(TOPIC_NAME,"key","value"+ i),new Callback(){
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception){
if(exception ==null){
System.out.println("发送状态:"+String());
}
else{
exception.printStackTrace();
}
}
});
}
// 关闭⽣产者
producer.close();
}
}
(2)运⾏结果
4、发送消息到Broker的分区配置
(1)分区配置
如果指定Partition ID,则Record被发送⾄指定Partition
如果未指定Partition但指定了Key,则Record按照hash(key)发送⾄对应key
如果未指定PartitionID,也没指定Key,Record会按照轮询模式发送到每个Partition
如果同时指定了Partition和key,Record只会发送到指定的Partition,key不起作⽤
(2)PeoducerRecord概述
发送给Kafka Broker的key/value值对,封装基础数据信息
数据格式:topic+Partition+Key+Value
(3)Key概述
如果key为空,kafka使⽤默认的partitioner,使⽤RoundRobin算法将消息均衡分布到各个partition上如果key不为空,kafka使⽤⾃⼰实现的hash⽅法对key进⾏散列,决定消息该被写到topic的那个partition上拥有相同key的消息会被写到同⼀个partition,实现顺序消息
(4)代码实战 - 指定分区发送
查看源码
编写代码
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论