Redis实战——Redis的pubSub(订阅与发布)在java中的实现借鉴:blog.csdn/canot/article/details/51938955
1.什么是pub/sub
Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是⽬前⼴泛使⽤的通信模型,它采⽤事件作为基本的通信机制,提供⼤规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的⽅式表达出它有兴趣接收的⼀个事件或⼀类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为相似。
同样,Redis的pub/sub是⼀种消息通信模式,主要的⽬的是解除消息发布者和消息订阅者之间的耦合,Redis作为⼀个pub/sub的server,在订阅者和发布者之间起到了消息路由的功能。
2.Redis pub/sub的实现
Redis通过publish和subscribe命令实现订阅和发布的功能。订阅者可以通过subscribe向redis server订阅⾃⼰感兴趣的消息类型。redis将信息类型称为通道(channel)。当发布者通过publish命令向redis server发送特定类型的信息时,订阅该消息类型的全部订阅者都会收到此消息。
客户端1订阅CCTV1:
127.0.0.1:6379> subscribe CCTV1
(press Ctrl-C to quit)
1) "subscribe"
2) "CCTV1"
3) (integer) 1
客户端2订阅CCTV1和CCTV2:
127.0.0.1:6379> subscribe CCTV1 CCTV2
(press Ctrl-C to quit)
1) "subscribe"
2) "CCTV1"
3) (integer) 1
1) "subscribe"
2) "CCTV2"
3) (integer) 2
此时这两个客户端分别监听这指定的频道。现在另⼀个客户端向服务器推送了关于这两个频道的信息。
127.0.0.1:6379> publish CCTV1 "cctv1 is good"
(integer) 2
//返回2表⽰两个客户端接收了次消息。被接收到消息的客户端如下所⽰。
1) "message"
2) "CCTV1"
3) "cctv1 is good"
-
---
1) "message"
2) "CCTV1"
3) "cctv1 is good"
如上的订阅/发布也称订阅发布到频道(使⽤publish与subscribe命令),此外还有订阅发布到模式(使⽤psubscribe来订阅⼀个模式)
订阅CCTV的全部频道
127.0.0.1:6379> psubscribe CCTV*
(press Ctrl-C to quit)
1) "psubscribe"
2) "CCTV*"
3) (integer) 1
当依然先如上推送⼀个CCTV1的消息时,该客户端正常接收。
3.Pub/Sub在java中的实现
导⼊Redis驱动:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
Redis驱动包提供了⼀个抽象类:JedisPubSub…继承这个类就完成了对客户端对订阅的监听。⽰例代码: /**
* redis发布订阅消息
* @ClassName: RedisMsgPubSubListener
* @Description: TODO
* @author OnlyMate
* @Date 2018年8⽉22⽇上午10:05:35
*
*/
public class RedisMsgPubSubListener extends JedisPubSub {
private Logger logger = Logger(RedisMsgPubSubListener.class);
@Override
public void unsubscribe() {
super.unsubscribe();
}
@Override
public void channels) {
super.unsubscribe(channels);
}
@Override
public void channels) {
super.subscribe(channels);
}
@Override
public void patterns) {
super.psubscribe(patterns);
}
@Override
public void punsubscribe() {
super.punsubscribe();
}
@Overridespring framework是什么系统
public void patterns) {
super.punsubscribe(patterns);
}
@Override
public void onMessage(String channel, String message) {
logger.info("onMessage: channel[{}], message[{}]",channel, message);
}
@Override
public void onPMessage(String pattern, String channel, String message) {
logger.info("onPMessage: pattern[{}], channel[{}], message[{}]", pattern, channel, message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
logger.info("onSubscribe: channel[{}], subscribedChannels[{}]", channel, subscribedChannels);
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
logger.info("onPUnsubscribe: pattern[{}], subscribedChannels[{}]", pattern, subscribedChannels);
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
logger.info("onPSubscribe: pattern[{}], subscribedChannels[{}]", pattern, subscribedChannels);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
logger.info("channel:{} is been subscribed:{}", channel, subscribedChannels);
}
}
如上所⽰,抽象类中存在的⽅法。分别表⽰
监听到订阅模式接受到消息时的回调 (onPMessage)
监听到订阅频道接受到消息时的回调 (onMessage )
订阅频道时的回调( onSubscribe )
取消订阅频道时的回调( onUnsubscribe )
订阅频道模式时的回调 ( onPSubscribe )
取消订阅模式时的回调( onPUnsubscribe )
运⾏我们刚刚编写的类:
订阅者
/**
* 订阅者
* @ClassName: RedisSubTest
* @Description: TODO
* @author OnlyMate
* @Date 2018年8⽉23⽇下午2:59:42
*
*/
public class RedisSubTest {
@Test
public void subjava() {
System.out.println("订阅者 ");
Jedis jr = null;
try {
jr = new Jedis("127.0.0.1", 6379, 0);// redis服务地址和端⼝号
RedisMsgPubSubListener sp = new RedisMsgPubSubListener();
// jr客户端配置监听两个channel
jr.subscribe(sp, "news.share", "news.blog");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (jr != null) {
jr.disconnect();
}
}
}
}
发布者
/**
* 发布者
* @ClassName: RedisPubTest
* @Description: TODO
* @author OnlyMate
* @Date 2018年8⽉23⽇下午2:59:25
*
*/
public class RedisPubTest {
@Test
public void pubjava() {
System.out.println("发布者 ");
Jedis jr = null;
try {
jr = new Jedis("127.0.0.1", 6379, 0);// redis服务地址和端⼝号
// jr客户端配置监听两个channel
jr.publish( "news.share", "新闻分享");
jr.publish( "news.blog", "新闻博客");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (jr != null) {
jr.disconnect();
}
}
}
}
从代码中我们不难看出,我们声明的⼀个redis链接在设置监听后就可以执⾏⼀些操作,例如发布消息,订阅消息等。。。当运⾏上述代码后会在控制台输出:
此时当在有客户端向new.share或者new.blog通道publish消息时,onMessage⽅法即可被相应。(jedis.publish(channel, message))。
4.Pub/Sub在Spring中的实践
导⼊依赖jar
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.0.8.RELEASE</version>
</dependency>
核⼼消息
/**
* redis发布订阅消息
* @ClassName: RedisMsgPubSubListener
* @Description: TODO
* @author OnlyMate
* @Date 2018年8⽉22⽇上午10:05:35
*
*/
public class RedisMsgPubSubListener implements MessageListener {
private Logger logger = Logger(RedisMsgPubSubListener.class);
@Override
public void onMessage( final Message message, final byte[] pattern ) {
RedisSerializer<?> serializer = ValueSerializer();
// Body()是Redis的值,需要⽤redis的valueSerializer反序列化
logger.info("Message receive-->pattern:{},message: {},{}", new String(pattern),
serializer.Body()),
logger.String());
JSONObject json = JSONObject.parseObject(serializer.Body()).toString());
String cutomerId = String("cutomerId");
//可以与WebSocket结合使⽤,解决分布式服务中,共享Session
if(StringUtils.isNotEmpty(cutomerId)) {
logger.info("cutomerId: {},消息:{}", cutomerId, String());
}else {
logger.info("cutomerId 为空,⽆法推送给对应的客户端,消息:{}", String());
}
}
}
现在我们在获取RedisTemplate,并给WEB_SOCKET:LOTTERY这个channel publish数据。
/**
* 发布者
* @ClassName: RedisMsgPubClient
* @Description: TODO
* @author OnlyMate
* @Date 2018年8⽉23⽇下午3:59:33
*
*/
@Controller
@RequestMapping(value="/redisMsgPubClientBySpring")
public class RedisMsgPubClient {
private Logger logger = Logger(RedisMsgPubClient.class);
@Autowired
private RedisTemplate<Object,Object> redisTemplate;
@RequestMapping
@ResponseBody
public String pubMsg(HttpServletRequest request, HttpServletResponse response) {
String cutomerId = Parameter("cutomerId").toString();
String msg = Parameter("msg").toString();
logger.info("发布消息:{}", Parameter("msg").toString());
JSONObject json = new JSONObject();
json.put("cutomerId", cutomerId);
json.put("msg", msg);
return "成功";
}
}
最后⼀步reids的配置
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="/schema/beans"
xmlns:xsi="/2001/XMLSchema-instance" xmlns:context="/schema/context"
xmlns:aop="/schema/aop" xmlns:tx="/schema/tx"
xmlns:mvc="/schema/mvc" xmlns:cache="www.springframework.
org/schema/cache"
xmlns:p="/schema/p"
xsi:schemaLocation="/schema/beans
/schema/beans/spring-beans.xsd
/schema/context
/schema/context/spring-context.xsd
/schema/aop
/schema/aop/spring-aop.xsd
/schema/tx
/schema/tx/spring-tx.xsd
/schema/mvc
/schema/mvc/spring-mvc.xsd
/schema/cache
/schema/cache/spring-cache.xsd"
default-autowire="byName">
<description>redis 相关类 Spring 托管</description>
<!-- 开启缓存 -->
<cache:annotation-driven />
<bean name="springCacheAnnotationParser" class="org.springframework.cache.annotation.SpringCacheAnnotationParser"></bean> <bean name="annotationCacheOperationSource" class="org.springframework.cache.annotation.AnnotationCacheOperationSource"> <constructor-arg>
<array>
<ref bean="springCacheAnnotationParser"/>
</array>
</constructor-arg>
</bean>
<bean name="cacheInterceptor" class="org.springframework.cache.interceptor.CacheInterceptor">
<property name="cacheOperationSources" ref="annotationCacheOperationSource"/>
</bean>
<bean class="org.springframework.cache.interceptor.BeanFactoryCacheOperationSourceAdvisor">
<property name="cacheOperationSource" ref="annotationCacheOperationSource"/>
<property name="advice" ref="cacheInterceptor"/>
<property name="order" value="2147483647"/>
</bean>
<!--载⼊ redis 配置⽂件-->
<context:property-placeholder location="classpath:redis.properties" ignore-unresolvable="true"/>
<!-- 配置JedisConnectionFactory -->
<bean id="jedisConnectionFactory" class="org.tion.jedis.JedisConnectionFactory">
<property name="hostName" value="${redis.host}"/>
<property name="port" value="${redis.port}"/>
<property name="password" value="${redis.pass}"/>
<property name="database" value="${redis.dbIndex}"/>
<property name="poolConfig" ref="jedisPoolConfig"/>
<!-- <constructor-arg name="sentinelConfig" ref="redisSentinelConfiguration" /> -->
<constructor-arg name="poolConfig" ref="jedisPoolConfig"/>
</bean>
<!-- 配置 JedisPoolConfig 实例 -->
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<!-- 最⼤连接数 -->
<property name="maxTotal" value="${redis.pool.maxActive}"/>
<!-- 最⼤空闲时间 -->
<property name="maxIdle" value="${redis.pool.maxIdle}"/>
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论