CIM即时通讯源码初步解析(⼀款个⼈推荐的带集的开源项⽬)BindMessageListener
如下代码作为理解⼊⼝,安卓、IOS、web在连接成功后才会发起所谓登录,登录时会携带必要信息,例如个⼈账号唯⼀标识uid、设备类型deviceId等字段发起登录请求,⽬前重写的onMessage回调⽅法中只是对登录逻辑进⾏处理,若要实现单聊及聊需要额外拓展参数msgType(信息参数类型),根据信息类型作出相应的逻辑处理,在登录逻辑中:
1.Session session = JSONUtils.Body(), Session.class); String uid = Uid(); 获取uid,后⾯根据uid及设备限定类型过滤筛选出此账号的所有频道(因为⼀个账号可以在不同终端登录,⽽同⼀终端只能⼀个账号登录)
veIf(channel -> Nid().equals(channel.attr(ChannelAttr.ID).get()));意思是将要通知下线的频道集合中过滤掉本次连接的频道,不然也会通知刚连接上的频道
3.Collection<Channel> 相同账号不同终端的频道都放在此
4.@Resource private SessionGroup sessionGroup;sessionGroup是存放所有频道信息的容器,后续单聊、聊拓展需要从此理解及开发逻辑package;
import Session;
import ChannelAttr;
import SessionGroup;
import Message;
import JSONUtils;
import Channel;
import MessageListener;
import Component;
import Resource;
import Collection;
import HashMap;
import Map;
import Objects;
/**
* 集环境下,监控多设备登录情况,控制是否其余终端下线的逻辑
*/
@Component
public class BindMessageListener implements MessageListener {
private static final String FORCE_OFFLINE_ACTION ="999";
private static final String SYSTEM_ID ="0";
/*
⼀个账号只能在同⼀个类型的终端登录
如: 多个android或ios不能同时在线
⼀个android或ios可以和web,桌⾯同时在线
*/
private final Map<String,String[]> conflictMap =new HashMap<>();
@Resource
private SessionGroup sessionGroup;
public BindMessageListener(){
conflictMap.put(Session.CHANNEL_ANDROID,new String[]{Session.CHANNEL_ANDROID,Session.CHANNEL_IOS});
conflictMap.put(Session.CHANNEL_IOS,new String[]{Session.CHANNEL_ANDROID,Session.CHANNEL_IOS});
conflictMap.put(Session.CHANNEL_WINDOWS,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});
conflictMap.put(Session.CHANNEL_WEB,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});
conflictMap.put(Session.CHANNEL_MAC,new String[]{Session.CHANNEL_WINDOWS,Session.CHANNEL_WEB,Session.CHANNEL_MAC});
}
@Override
public void onMessage(Message redisMessage,byte[] bytes){
Session session = JSONUtils.Body(), Session.class);
String uid = Uid();
String[] conflictChannels = (Channel());
Collection<Channel> channelList = sessionGroup.find(uid,conflictChannels);
/*
* 获取到其他在线的终端连接,提⽰账号再其他终端登录
*/
channelList.forEach(channel ->{
if(Objects.DeviceId(),channel.attr(ChannelAttr.DEVICE_ID).get())){
channel.close();
return;
}安卓在线解析json
Message message =new Message();
message.setAction(FORCE_OFFLINE_ACTION);
message.setReceiver(uid);
message.setSender(SYSTEM_ID);
message.DeviceName());
channel.writeAndFlush(message);
channel.close();
});
}
}
netty即时通讯SDK部分源码
package;
import ChannelAttr;
import Message;
import Channel;
import ChannelFuture;
import ChannelFutureListener;
import*;
import ConcurrentHashMap;
import ConcurrentLinkedQueue;
import Predicate;
import Collectors;
public class SessionGroup extends ConcurrentHashMap<String, Collection<Channel>>{
private static final Collection<Channel> EMPTY_LIST =new LinkedList<>();
private final transient ChannelFutureListener remover =new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future){
remove(future.channel());
}
};
protected String getKey(Channel channel){
return channel.attr(ChannelAttr.UID).get();
}
public void remove(Channel channel){
String uid =getKey(channel);
if(uid ==null){
return;
return;
}
Collection<Channel> collections =getOrDefault(uid,EMPTY_LIST);
if(collections.isEmpty()){
remove(uid);
}
}
public void add(Channel channel){
String uid =getKey(channel);
if(uid ==null||!channel.isActive()){
return;
}
channel.closeFuture().addListener(remover);
Collection<Channel> collections =this.putIfAbsent(uid,new ConcurrentLinkedQueue<>(Collections.singleton(channel)));
if(collections !=null){
collections.add(channel);
}
if(!channel.isActive()){
remove(channel);
}
}
public void write(String key,Message message){
find(key).forEach(channel -> channel.writeAndFlush(message));
}
public void write(String key, Message message, Predicate<Channel> matcher){
find(key).stream().filter(matcher).forEach(channel -> channel.writeAndFlush(message));
}
public void write(String key, Message message, Collection<String> excludedSet){
find(key).stream().filter(channel -> excludedSet ==null||!ains(channel.attr(ChannelAttr.UID).get())).forEach(channel -> channel.writ eAndFlush(message));
}
public void write(Message message){
this.Receiver(),message);
}
public Collection<Channel>find(String key){
OrDefault(key,EMPTY_LIST);
}
public Collection<Channel>find(String channel){
List<String> channels = Arrays.asList(channel);
return find(key).stream().filter(item -> ains(item.attr(ChannelAttr.CHANNEL).get())).List());
}
}
安卓发送信息接⼝
package;
import Call;
import Field;
import FormUrlEncoded;
import POST;
public interface MessageService {
@POST("/send")
@FormUrlEncoded
Call<Void>send(@Field("sender") String sender,
@Field("receiver") String receiver,
@Field("action") String action,
@Field("content") String content); }

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。