通过JMS监听OracleAQ,在数据库变化时触发执⾏Java程序
版权声明:本⽂为博主原创⽂章,未经博主允许不得转载。 blog.csdn/scorpio3k/article/details/49406209
环境说明
本实验环境基于Oracle 12C和JDK1.8,其中Oracle 12C⽀持多租户特性,相较于之前的Oracle版本,使⽤‘C##⽤户名‘表⽰⽤户,例如如果数据库⽤户叫kevin,则登陆时使⽤C##kevin进⾏登陆。
⼀、Oracle⾼级消息队列AQ
Oracle AQ是Oracle中的消息队列,是Oracle中的⼀种⾼级应⽤,每个版本都在不断的加强,使⽤DBMS_AQ系统包进⾏相应的操作,是Oracle的默认组件,只要安装了Oracle数据库就可以使⽤。使⽤AQ可以在多个Oracle数据库、Oracle与Java、C等系统中进⾏数据传输。
下⾯分步骤说明如何创建Oracle AQ
1. 创建消息负荷payload
Oracle AQ中传递的消息被称为有效负荷(payloads),格式可以是⽤户⾃定义对象或XMLType或ANYDATA。本例中我们创建⼀个简单的对象类型⽤于传递消息。
create type demo_queue_payload_type as object (message varchar2(4000));
2. 创建队列表
队列表⽤于存储消息,在⼊队时⾃动存⼊表中,出队时⾃动删除。使⽤DBMS_AQADM包进⾏数据表的创建,只需要写表名,同时设置相应的属性。对于队列需要设置multiple_consumers为false,如果使⽤发布/订阅模式需要设置为true。
begin
ate_queue_table(
queue_table  => 'demo_queue_table',
queue_payload_type => 'demo_queue_payload_type',
multiple_consumers => false
)
;
end;
执⾏完后可以查看oracle表中⾃动⽣成了demo_queue_table表,可以查看影响⼦段(含义⽐较清晰)。
3. 创建队列并启动
创建队列并启动队列:
begin
ate_queue (
queue_name  => 'demo_queue',
queue_table => 'demo_queue_table'
);
dbms_aqadm.start_queue(
queue_name  =>  'demo_queue'
);
end;
⾄此,我们已经创建了队列有效负荷,队列表和队列。可以查看以下系统创建了哪些相关的对象:
SELECT object_name, object_type FROM user_objects WHERE object_name != 'DEMO_QUEUE_PAYLOAD_TYPE';oracle 时间转换
OBJECT_NAME OBJECT_TYPE
------------------------------ ---------------
DEMO_QUEUE_TABLE TABLE
SYS_C009392 INDEX
SYS_LOB0000060502C00030$$ LOB
AQ$_DEMO_QUEUE_TABLE_T INDEX
AQ$_DEMO_QUEUE_TABLE_I INDEX
AQ$_DEMO_QUEUE_TABLE_E QUEUE
AQ$DEMO_QUEUE_TABLE VIEW
DEMO_QUEUE QUEUE
我们看到⼀个队列带出了⼀系列⾃动⽣成对象,有些是被后⾯直接⽤到的。不过有趣的是,创建了第⼆个队列。这就是所谓的异常队列(exception queue)。如果AQ⽆法从我们的队列接收消息,将记录在该异常队列中。
消息多次处理出错等情况会⾃动转移到异常的队列,对于异常队列如何处理⽬前笔者还没有到相应的写法,因为我使⽤的场景并不要求消息必须⼀对⼀的被处理,只要起到通知的作⽤即可。所以如果消息转移到异常队列,可以执⾏清空队列表中的数据
delete from demo_queue_table;
4. 队列的停⽌和删除
如果需要删除或重建可以使⽤下⾯的⽅法进⾏操作:
BEGIN
DBMS_AQADM.STOP_QUEUE(
queue_name => 'demo_queue'
);
DBMS_AQADM.DROP_QUEUE(
queue_name => 'demo_queue'
);
DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table => 'demo_queue_table'
);
END;
5. ⼊队消息
⼊列操作是⼀个基本的事务操作(就像往队列表Insert),因此我们需要提交。
declare
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
begin
o_payload := demo_queue_payload_type('what is you name ?');
queue(
queue_name  => 'demo_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
commit;
end;
通过SQL语句查看消息是否正常⼊队:
select * from aq$demo_queue_table;
select user_data from aq$demo_queue_table;
6. 出队消息
使⽤Oracle进⾏出队操作,我没有实验成功(不确定是否和DBMS_OUTPUT的执⾏权限有关),代码如下,读者可以进⾏调试:
declare
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
begin
DBMS_AQ.DEQUEUE(
queue_name => 'demo_queue',
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
DBMS_OUTPUT.PUT_LINE(
'***** Browse message is [' || ssage || ']****'
);
end;
⼆、Java使⽤JMS监听并处理Oracle AQ队列
Java使⽤JMS进⾏相应的处理,需要使⽤Oracle提供的jar,在Oracle安装⽬录可以到:在linux中可以使⽤find命令进⾏查,例如find `pwd` -name 'jmscommon.jar'
需要的jar为:
app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/jmscommon.jar
app/oracle/product/12.1.0/dbhome_1/jdbc/lib/ojdbc7.jar
app/oracle/product/12.1.0/dbhome_1/jlib/orai18n.jar
app/oracle/product/12.1.0/dbhome_1/jlib/jta.jar
app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/aqapi_g.jar
1. 创建连接参数类
实际使⽤时可以把参数信息配置在properties⽂件中,使⽤Spring进⾏注⼊。
/**
*
* @author 李⽂锴
*  连接参数信息
*
*/
public class JmsConfig {
public String username = "c##kevin";
public String password = "a111111111";
public String jdbcUrl = "jdbc:oracle:thin:@127.0.0.1:1521:orcl";
public String queueName = "demo_queue";
}
2. 创建消息转换类
因为消息载荷是Oracle数据类型,需要提供⼀个转换⼯⼚类将Oracle类型转换为Java类型。
import java.sql.SQLException;
import oracle.jdbc.driver.OracleConnection;
import oracle.jdbc.internal.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.CustomDatum;
import oracle.sql.CustomDatumFactory;
import oracle.sql.Datum;
import oracle.sql.STRUCT;
/**
*
* @author 李⽂锴
* 数据类型转换类
*
*/
@SuppressWarnings("deprecation")
public class QUEUE_MESSAGE_TYPE implements CustomDatum, CustomDatumFactory {
public static final String _SQL_NAME = "QUEUE_MESSAGE_TYPE";
public static final int _SQL_TYPECODE = OracleTypes.STRUCT;
MutableStruct _struct;
// 12表⽰字符串
static int[] _sqlType = { 12 };
static CustomDatumFactory[] _factory = new CustomDatumFactory[1];
static final QUEUE_MESSAGE_TYPE _MessageFactory = new QUEUE_MESSAGE_TYPE();
public static CustomDatumFactory getFactory() {
return _MessageFactory;
}
public QUEUE_MESSAGE_TYPE() {
_struct = new MutableStruct(new Object[1], _sqlType, _factory);
}
public Datum toDatum(OracleConnection c) throws SQLException {
return _Datum(c, _SQL_NAME);
}
public CustomDatum create(Datum d, int sqlType) throws SQLException {
if (d == null)
return null;
QUEUE_MESSAGE_TYPE o = new QUEUE_MESSAGE_TYPE();
o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
return o;
}
public String getContent() throws SQLException {
return (String) _Attribute(0);
}
}
3. 主类进⾏消息处理

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。