springcloudstream学习
spring cloud stream 学习
基于《spring cloud stream 3.1.x》
在学习spring cloud stream 之前, 先要了解⼀下 spring cloud function 模块, 新版采⽤ function 函数引⼊依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
配置 rabbitmq:
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
Consuemr<?>
在 spring cloud stream 中充当消费者⾓⾊
application.properties :
# 定义函数
spring.cloud.function.definition=upperCase
# 定义绑定
spring.cloud.stream.function.bindings.upperCase-in-0=upperCaseBinding
# spring.cloud.stream.function.bindings.<functionName>-in-<index>=bindingName
# spring.cloud.stream.function.bindings.<functionName>-out-<index>=bindingName
# 绑定 exchange, 默认是 topic exchange
spring.cloud.stream.bindings.upperCaseBinding.destination=upperCaseBus
input - <functionName> + -in- + <index>
output - <functionName> + -out- + <index>
定义消费者
@Configuration
public class ConsumerConfiguratioin {
@Bean
public Consumer<String>upperCase(){
return s ->{
String r = s.toUpperCase();
System.out.println("consumer: "+ r);
};
}
}
//结果
consumer: HELLO
定义⼀个 direct 类型的 exchange
通过 bindingName 默认绑定的 exchange 类型是 topic 类型的, 那如何 定义⼀个 direct 类型的 exchange 呢?
# 定义函数
springcloud难学吗spring.cloud.function.definition=directEx
## 定义 direct exchange
spring.cloud.stream.function.bindings.directEx-in-0=directExBinding
# 绑定 exchange, 默认是 topic exchange
spring.cloud.stream.bindings.directExBinding.destination=directEx
spring.cloud.stream.rabbit.hange-type=direct
spring.cloud.stream.rabbit.sumer.binding-routing-key=goto
定义函数:
/**
* 定义 direct 类型的 exchange
*
* @return
*/
@Bean
public Consumer<String>directEx(){
return s ->{
String r = s.toUpperCase();
System.out.println("consumer: "+ r);
};
}
可以看见 rabbitmq 管理台 创建了⼀个 类型为 direct 的 名称为directEx 的 exchange !! Supplier<?>
定义⽣产者,不停的发送消息
application.properties 配置
# 定义函数
spring.cloud.function.definition=upperCase;sayHello
# 定义 supplier, 这⾥声明是⽣产者
spring.cloud.stream.source=sayHello
# 定义绑定
spring.cloud.stream.function.bindings.sayHello-out-0=sayHelloBinding
# 绑定 exchange, 默认是 topic exchange
spring.cloud.stream.bindings.sayHelloBinding.destination=sayHelloBus
定义⽣产者
/**
* 定义 producer
* 默认是每秒发送
*
* @return
*/
@Bean
public Supplier<String>sayHello(){
return()->{
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");            String str ="hello";
try{
TimeUnit.SECONDS.sleep(10);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("["+ formatter.w())+"] send: "+ str);
return str;
};
}
结果:
[2021-08-03 22:43:46] send: hello
[2021-08-03 22:43:57] send: hello
[2021-08-03 22:44:08] send: hello
动态发送
通过 StreamBridge 可以动态选择 bindingName 进⾏发送
@RestController
public class ProducerController {
@Autowired
private StreamBridge streamBridge;
@GetMapping("/send")
public String send(String msg){
streamBridge.send("passByBinding", msg);
return"ok";
}
}
Function<F,T> 消息转换器
相当于 消息进⾏转换管道 , 将 F 类型转换成 T 类型消息, 多个管道之间⽤ "|" 区分application.properties 配置
# 定义函数
spring.cloud.function.definition=upperCase;passBy|lowerCase
# 定义绑定
spring.cloud.stream.function.bindings.upperCase-in-0=upperCaseBinding
# 绑定 exchange, 默认是 topic exchange
spring.cloud.stream.bindings.upperCaseBinding.destination=upperCaseBus
## 定义 function,类似消息转换或者消息加⼯
spring.cloud.stream.function.bindings.passBy|lowerCase-in-0=passByBinding
spring.cloud.stream.function.bindings.passBy|lowerCase-out-0=upperCaseBus
spring.cloud.stream.bindings.passByBinding.destination=passByBus
使⽤动态发送
@RestController
public class ProducerController {
@Autowired
private StreamBridge streamBridge;
@GetMapping("/send")
public String send(String msg){
streamBridge.send("passByBinding", msg);
return"ok";
}
}
结果:
passBy: ECHO: meme
lowerCase: echo: meme
consumer: ECHO: MEME
消息⾛向: 消息先发送到 passBy -> 然后到 lowerCase -> 最后到消费者 consumer good luck!

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。