spring-cloud-stream结合kafka使⽤详解
1.pom⽂件导⼊依赖
<!-- kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
l⽂件配置
spring:
cloud:
stream:
kafka:
binder:
brokers: :xxxx // Kafka的消息中间件服务器地址
bindings:
xxx_output: // 通道名称
destination: xxx // 消息发往的⽬的地,对应topic 在发送消息的配置⾥⾯,group是不⽤配置的
// 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
xxx_input:
destination: xxx // 消息发往的⽬的地,对应topic
group: xxx // 对应kafka的group
3.创建消息发送者
@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表⽰这是⼀个可绑定的发布通道
@Service
public class MqService {
@Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
springcloud怎么读音private MessageChannel oesWorkbenchChannel;
/**
* 发送⼀条kafka消息
*/
public boolean sendLifeData(Object object) {
return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT);
}
}
// 发布通道
public interface Source {
@Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道⽤MessageChannel
}
4.创建消息监听者
@Slf4j
@EnableBinding(Sink.class)
public class WorkbenchStreamListener {
@Resource
private FileService fileService;
@_input) // 监听接受通道
public void receiveData(MoveMessage moveMessage) {
}
}
// 接受通道
public interface Sink {
@Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT)
SubscribableChannel oesWorkbenchMoveInput(); // 接受通道⽤SubscribableChannel
}
接下来就可以愉快的发送监听消息了
到此这篇关于spring-cloud-stream结合kafka使⽤详解的⽂章就介绍到这了,更多相关spring-cloud-stream整合kafka内容请搜索以前的⽂章或继续浏览下⾯的相关⽂章希望⼤家以后多多⽀持!
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论