The Oracle Database 23c provides integrated, robust and feature-rich queuing systems AQ (Advance Queues) and TxEventQ(Transactional Event Queues). Oracle Transactional Event Queues(TxEventQ) facilitate the implementation of event-based/streaming applications. TxEventQ can be integrated with open source event streaming platform Kafka.
So, let’s wear our developer’s hat and create a simple TxEventQ in Oracle Database 23c.

Pre-Requisites 🔗
- Oracle Database 23c free
- User with SYSDBA privileges
Steps 🔗
- Create a database user and grant privileges. Login using sysdba user and execute the below commands using SQL developer or SQL plus.
-- CREATE USER <USERNAME> IDENTIFIED BY <PASSWORD>
CREATE USER dm_teq IDENTIFIED BY *****;
-- GRANT ACCESS AND ROLES TO THE USER
GRANT CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE TO dm_teq;
GRANT EXECUTE ON DBMS_AQ TO dm_teq;
GRANT EXECUTE ON DBMS_AQADM TO dm_teq;
-- OPTIONAL GRANT TABLESPACE TO THE USER
GRANT UNLIMITED TABLESPACE TO dm_teq;
- There is a procedure CREATE_TRANSACTIONAL_EVENT_QUEUE available under package DBMS_AQADM that can be used to create TxEventQ.
PROCEDURE CREATE_TRANSACTIONAL_EVENT_QUEUE (
queue_name IN VARCHAR2,
storage_clause IN VARCHAR2 DEFAULT NULL,
multiple_consumers IN BOOLEAN DEFAULT FALSE,
max_retries IN NUMBER DEFAULT NULL,
comment IN VARCHAR2 DEFAULT NULL,
queue_payload_type IN VARCHAR2 DEFAULT JMS_TYPE,
queue_properties IN QUEUE_PROPS_T DEFAULT NULL,
replication_mode IN BINARY_INTEGER DEFAULT NONE);
Login via the newly created user(**dm_teq**) and execute the below commands.
BEGIN
dbms_aqadm.create_transactional_event_queue(queue_name => 'demo_topic', multiple_consumers => true, queue_payload_type => dbms_aqadm.jms_type
);
-- Start the
dbms_aqadm.start_queue(queue_name => 'demo_topic');
END;
The above command will create a topic(with multiple consumers) with the name “demo-topic”, payload type as JMS and start the topic.
Fetch TxEventQ Params 🔗

Now, the topic is created. What if we need to fetch queue/parameters, we can use the get_queue_parameter procedure as below.
set SERVEROUTPUT on;
declare
out_var VARCHAR2(100);
begin
dbms_aqadm.get_queue_parameter(
queue_name=> 'demon_topic',
param_name=> 'SHARD_NUM',
param_value=> out_var
);
dbms_output.put_line(out_var);
end;
Output: Below output will display the number of partitions for the topic.
5
PL/SQL procedure successfully completed.
Add a Subscriber to the TxEventQ 🔗
Execute below pl/sql block to add a subscriber to the demo_topic(TxEventQ).
DECLARE
subscriber SYS.aq$_agent;
/* Add subscribers omq to the suscriber list: */
BEGIN
subscriber := SYS.aq$_agent('OMQ', NULL, NULL);
DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'demo_topic',
subscriber => subscriber);
END;
Query the list of subscribers for the topic.
-- view name : aq$<queue_name>_s
select * from aq$demo_topic_s;
Output:

Publish a message on the topic 🔗
We have created a topic based on JMS Message type. Payload should also be of type JMS message.
DECLARE
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message SYS.AQ$_JMS_MESSAGE;
BEGIN
message:= SYS.AQ$_JMS_MESSAGE.construct(DBMS_AQ.JMS_TEXT_MESSAGE);
message.set_text('hello world');
dbms_aq.enqueue(queue_name => 'demo_topic', enqueue_options => enqueue_options, message_properties => message_properties, payload => message
, msgid => message_handle);
DBMS_OUTPUT.PUT_LINE(message_handle);
END;
The message has been published on the topic, let’s validate. We can check the status of the message from the available view “aq$<queue_name>”. Let’s run the below query to know the status of the message.
select msg_id, user_data,msg_state,enq_timestamp,deq_timestamp from aq$demo_topic;
Output:

Subscribe messages from the topic 🔗
Let’s use the subscriber created in the earlier step to extract the message from the topic.
DECLARE
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message sys.aq$_jms_message;
message_handle RAW(16);
message_text VARCHAR2(100);
BEGIN
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
dequeue_options.consumer_name := 'omq';
dbms_aq.dequeue(queue_name => 'demo_topic', dequeue_options => dequeue_options, message_properties => message_properties, payload => message
, msgid => message_handle);
message.get_text(message_text);
dbms_output.put_line(message_text);
END;
Output :
hello world
PL/SQL procedure successfully completed.
This is a simple example to dequeue a message from TxEventQ. Let’s run the below query to know the status of the message.
select msg_id, user_data,msg_state,enq_timestamp,deq_timestamp from aq$demo_topic;

The state now changed from READY to PROCESSED.
This is a simple walkthrough to configure and use TxEventQ in Oracle Database 23c with the help of PL/SQL.
Please share your valuable feedback! 😄😄😄