使⽤Java开发EMQXMQTT服务器插件
从 v4.1 版本开始,提供了专门的多语⾔⽀持插件,现已⽀持使⽤其他编程语⾔来处理 EMQ X 中的钩⼦事件,开发者可以使⽤ Python 或者 Java 快速开发⾃⼰的插件,在官⽅功能的基础上进⾏扩展,满⾜⾃⼰的业务场景。例如:
验证某客户端的登录权限:客户端连接时触发对应函数,通过参数获取客户端信息后通过读取数据库、⽐对等操作判定是否有登录权限
记录客户端在线状态与上下线历史:客户端状态变动时触发对应函数,通过参数获取客户端信息,改写数据库中客户端在线状态
校验某客户端的 PUB/SUB 的操作权限:发布/订阅时触发对应函数,通过参数获取客户端信息与当前主题,判定客户端是否有对应的操作权限
处理会话 (Sessions) 和消息 (Message) 事件,实现订阅关系与消息处理/存储:消息发布、状态变动时触发对应函数,获取当前客户端信息、消息状态与消息内容,转发到Kafka 或数据库进⾏存储。
注:消息(Message) 类钩⼦,仅在企业版中⽀持。
Python 和 Java 驱动基于进程间通信实现,本⾝具有⾮常⾼的吞吐性能,本⽂以 Java 拓展为例介绍 EMQ X 跨语⾔拓展使⽤⽅式。
Java 拓展使⽤⽰例
要求
EMQ X 所在服务器需安装 JDK 1.8 以上版本
开始使⽤
1. 创建 Java 项⽬
2. 下载和⽂件
3. 添加qx.extension.jar和erlport.jar到项⽬依赖
4. 复制examples/SampleHandler.java到您的项⽬中
5. 根据 SDK SampleHandler.java中的⽰例编写业务代码,确保能够成功编译
部署
编译所有源代码后,需要将sdk和代码⽂件部署到 EMQ X 中:
1. 复制sion.jar到emqx/data/extension⽬录
2. 将编译后的.class⽂件,例如SampleHandler.class复制到emqx/data/extension ⽬录
3. 修改emqx/etc/plugins/emqx_f配置⽂件:
exhook.drivers = java
## Search path for scripts or library
exhook.drivers.java.path = data/extension/
exhook.drivers.java.init_module = SampleHandler
启动emqx_extension_hook插件,如果配置错误或 Java 代码编写错误将⽆法正常启动。启动后尝试建⽴ MQTT 连接并观察业务运⾏情况。
⽰例
以下为⽰例程序,该程序继承⾃ SDK 中的DefaultCommunicationHandler类。该⽰例代码演⽰了如何挂载 EMQ X 系统中所有的钩⼦:
sion.java.handler.*;
sion.dec.*;
sion.java.handler.ActionOptionConfig.Keys;
public class SampleHandler extends DefaultCommunicationHandler {
@Override
public ActionOptionConfig getActionOption() {
ActionOptionConfig option = new ActionOptionConfig();
option.set(Keys.MESSAGE_PUBLISH_TOPICS, "#");
option.set(Keys.MESSAGE_DELIVERED_TOPICS, "#");
option.set(Keys.MESSAGE_ACKED_TOPICS, "#");
option.set(Keys.MESSAGE_DROPPED_TOPICS, "#");
return option;
}
// Clients
@Override
public void onClientConnect(ConnInfo connInfo, Property[] props) {
}
@Override
public void onClientConnack(ConnInfo connInfo, ReturnCode rc, Property[] props) {
}
@Override
public void onClientConnected(ClientInfo clientInfo) {
}
@Override
public void onClientDisconnected(ClientInfo clientInfo, Reason reason) {
}
// 判定认证结果,返回 true 或 false
@Override
public boolean onClientAuthenticate(ClientInfo clientInfo, boolean authresult) {
return true;
}
// 判定 ACL 检查结果,返回 true 或 false
@Override
public boolean onClientCheckAcl(ClientInfo clientInfo, PubSub pubsub, Topic topic, boolean result) {
return true;
}
@Override
public void onClientSubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
}
@Override
public void onClientUnsubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
}
// Sessions
@Override
public void onSessionCreated(ClientInfo clientInfo) {
}
@Override
public void onSessionSubscribed(ClientInfo clientInfo, Topic topic, SubscribeOption opts) {
}
@Override
public void onSessionUnsubscribed(ClientInfo clientInfo, Topic topic) {
}
@Override
public void onSessionResumed(ClientInfo clientInfo) {
}
@Override
public void onSessionDiscarded(ClientInfo clientInfo) {
}
@Override
public void onSessionTakeovered(ClientInfo clientInfo) {
}
@Override
public void onSessionTerminated(ClientInfo clientInfo, Reason reason) {
}
// Messages
@Override
public Message onMessagePublish(Message message) {
return message;
}
@Override
public void onMessageDropped(Message message, Reason reason) {
}
@Override
public void onMessageDelivered(ClientInfo clientInfo, Message message) {
}
@Override
public void onMessageAcked(ClientInfo clientInfo, Message message) {
}
}
SampleHandler主要包含两部分:
1. 重载了getActionOption⽅法。该⽅法对消息(Message)相关的钩⼦进⾏配置,指定了需要⽣效的主题列表。
配置项对应钩⼦
MESSAGE_PUBLISH_TOPICS message_publishpython转java代码
MESSAGE_DELIVERED_TOPICS message_delivered
MESSAGE_ACKED_TOPICS message_acked
MESSAGE_DROPPED_TOPICS message_dropped
2. 重载了on<hookName>⽅法,这些⽅法是实际处理钩⼦事件的回调函数,函数命名⽅式为各个钩⼦名称变体后前⾯加on前缀,变体⽅式为钩⼦名称去掉下划线后使⽤骆驼拼
写法(CamelCase),例如,钩⼦client_connect对应的函数名为onClientConnect。 EMQ X 客户端产⽣的事件,例如:连接、发布、订阅等,都会最终分发到这些钩⼦事件回调函数上,然后回调函数可对各属性及状态进⾏相关操作。⽰例程序中仅对各参数进⾏了打印输出。如果只关⼼部分钩⼦事件,只需对这部分钩⼦事件的回调函数进⾏重载即可,不需要重载所有回调函数。
各回调函数的执⾏时机和⽀持的钩⼦列表与 EMQ X 内置的钩⼦完全⼀致,参见:
在实现⾃⼰的扩展程序时,最简单的⽅式也是继承DefaultCommunicationHandler⽗类,该类对各钩⼦与回调函数的绑定进⾏了封装,并进⼀步封装了回调函数涉及到的参数数据结构,以⽅便快速上⼿使⽤。
进阶开发
如果对 Java 扩展程序的可控性要求更⾼,DefaultCommunicationHandler类已⽆法满⾜需求时,可以通过实现CommunicationHandler接⼝,从更底层控制代码逻辑,编写更灵活的扩展程序。
sion.java.handler;
public interface CommunicationHandler {
public Object init();
public void deinit();
}
init()⽅法:⽤于初始化,声明扩展需要挂载哪些钩⼦,以及挂载的配置deinit()⽅法:⽤于注销。
详细数据格式说明,参见。
版权声明:本⽂为原创,转载请注明出处。
原⽂链接:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。