kafka同步提交异步_SpringBoot-Kafka的集成与使⽤详解
4(⽣产者2:消息。。。
当我们发送消息到 Kafka后,有时我们需要确认消息是否发送成功,如果消息发送失败,就要重新发送或者执⾏对应的业务逻辑。下⾯分别演⽰如何在异步或者同步发送消息时,获取发送结果。
四、⽣产者2:消息回调、同步异步发送消息
1,获取异步发送消息的结果
(1)默认情况下 KafkaTemplate发送消息是采取异步⽅式,并且 kafkaTemplate提供了⼀个回调⽅法 addCallback,我们可以在回调⽅法中监控消息是否发送成功或在失败时做补偿处理:
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
kafka使用教程
// 发送消息
@GetMapping("/test")
public void test() {
kafkaTemplate.send("topic1", "消息回调测试").addCallback(success -> {
// 消息发送到的topic
String topic = RecordMetadata().topic();
// 消息发送到的分区
int partition = RecordMetadata().partition();
// 消息在分区内的offset
long offset = RecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + Message());
});
}
}
(2)消息发送成功后,控制台显⽰内容如下:
2,同步发送消息的结果
(1)默认情况下 KafkaTemplate发送消息是采取异步⽅式发送的,如果希望同步发送消息只需要在 send⽅法后⾯调⽤ get⽅法即可,get⽅法返回的即为结果(如果发送失败则抛出异常)
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
// 发送消息
@GetMapping("/test")
public void test() {
try {
// 同步发送消息
SendResult sendResult =
kafkaTemplate.send("topic1", "消息回调测试").get();
// 消息发送到的topic
String topic = RecordMetadata().topic();
// 消息发送到的分区
int partition = RecordMetadata().partition();
// 消息在分区内的offset
long offset = RecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
} catch (InterruptedException e) {
System.out.println("发送消息失败:" + e.getMessage());
} catch (ExecutionException e) {
System.out.println("发送消息失败:" + e.getMessage());
}
}
}
(2)get⽅法还有⼀个重载⽅法get(long timeout, TimeUnit unit),当 send⽅法耗时⼤于 get⽅法所设定的参数时会抛出⼀个超时异常。⽐如下⾯我们设置了超时时长为 1 微秒(肯定超时):
注意:虽然超时了,但仅仅是抛出异常,消息还是会发送成功的。
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
// 发送消息
@GetMapping("/test")
public void test() {
try {
// 同步发送消息(并且耗时限制在1ms)
SendResult sendResult =
kafkaTemplate.send("topic1", "消息回调测试").get(1, TimeUnit.MICROSECONDS); // 消息发送到的topic
String topic = RecordMetadata().topic();
// 消息发送到的分区
int partition = RecordMetadata().partition();
// 消息在分区内的offset
long offset = RecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
} catch (TimeoutException e) {
System.out.println("发送消息超时");
} catch (InterruptedException e) {
System.out.println("发送消息失败:" + e.getMessage());
} catch (ExecutionException e) {
System.out.println("发送消息失败:" + e.getMessage());
}
}
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。