DecipherMiddleware

Transactional Event queues in Oracle Database 23c Free (PL/SQL)

· 617 words · 3 minutes to read · ✍️ Pranav Davar

The Oracle Database 23c provides integrated, robust and feature-rich queuing systems AQ (Advance Queues) and TxEventQ(Transactional Event Queues). Oracle Transactional Event Queues(TxEventQ) facilitate the implementation of event-based/streaming applications. TxEventQ can be integrated with open source event streaming platform Kafka.

So, let’s wear our developer’s hat and create a simple TxEventQ in Oracle Database 23c.

Pre-Requisites 🔗

Steps 🔗

  1. Create a database user and grant privileges. Login using sysdba user and execute the below commands using SQL developer or SQL plus.
-- CREATE USER <USERNAME> IDENTIFIED BY <PASSWORD>
CREATE USER dm_teq IDENTIFIED BY *****;
-- GRANT ACCESS AND ROLES TO THE USER
GRANT CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE TO dm_teq;
GRANT EXECUTE ON DBMS_AQ TO dm_teq;
GRANT EXECUTE ON DBMS_AQADM TO dm_teq;
-- OPTIONAL GRANT TABLESPACE TO THE USER
GRANT UNLIMITED TABLESPACE TO dm_teq;
  1. There is a procedure CREATE_TRANSACTIONAL_EVENT_QUEUE available under package DBMS_AQADM that can be used to create TxEventQ.
PROCEDURE CREATE_TRANSACTIONAL_EVENT_QUEUE (
    queue_name             IN VARCHAR2,
    storage_clause         IN VARCHAR2       DEFAULT NULL,
    multiple_consumers     IN BOOLEAN        DEFAULT FALSE,
    max_retries            IN NUMBER         DEFAULT NULL,
    comment                IN VARCHAR2       DEFAULT NULL, 
    queue_payload_type     IN VARCHAR2       DEFAULT JMS_TYPE,
    queue_properties       IN QUEUE_PROPS_T  DEFAULT NULL,
    replication_mode       IN BINARY_INTEGER DEFAULT NONE);

Login via the newly created user(**dm_teq**) and execute the below commands.

BEGIN
    dbms_aqadm.create_transactional_event_queue(queue_name => 'demo_topic', multiple_consumers => true, queue_payload_type => dbms_aqadm.jms_type
    );
-- Start the 
    dbms_aqadm.start_queue(queue_name => 'demo_topic');
END;

The above command will create a topic(with multiple consumers) with the name “demo-topic”, payload type as JMS and start the topic.

Fetch TxEventQ Params 🔗

Now, the topic is created. What if we need to fetch queue/parameters, we can use the get_queue_parameter procedure as below.

set SERVEROUTPUT on;
declare
    out_var VARCHAR2(100);
begin

    dbms_aqadm.get_queue_parameter(
    queue_name=> 'demon_topic',
    param_name=> 'SHARD_NUM',
    param_value=> out_var
    );
    dbms_output.put_line(out_var);
end;

Output: Below output will display the number of partitions for the topic.

5

PL/SQL procedure successfully completed.

Add a Subscriber to the TxEventQ 🔗

Execute below pl/sql block to add a subscriber to the demo_topic(TxEventQ).

DECLARE
    subscriber SYS.aq$_agent;
    /* Add subscribers omq to the suscriber list: */
BEGIN
    subscriber := SYS.aq$_agent('OMQ', NULL, NULL);
    DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'demo_topic',
    subscriber => subscriber);
END;

Query the list of subscribers for the topic.

-- view name : aq$<queue_name>_s
select * from aq$demo_topic_s;

Output:

Publish a message on the topic 🔗

--- config: theme: 'neutral' --- flowchart LR A("Publisher") --> id1[(demo_topic)]

We have created a topic based on JMS Message type. Payload should also be of type JMS message.

DECLARE
    enqueue_options    dbms_aq.enqueue_options_t;
    message_properties dbms_aq.message_properties_t;
    message_handle     RAW(16);
    message            SYS.AQ$_JMS_MESSAGE;
BEGIN
    message:= SYS.AQ$_JMS_MESSAGE.construct(DBMS_AQ.JMS_TEXT_MESSAGE);
    message.set_text('hello world');
    dbms_aq.enqueue(queue_name => 'demo_topic', enqueue_options => enqueue_options, message_properties => message_properties, payload => message
    , msgid => message_handle);
DBMS_OUTPUT.PUT_LINE(message_handle);
END;

The message has been published on the topic, let’s validate. We can check the status of the message from the available view “aq$<queue_name>”. Let’s run the below query to know the status of the message.

select msg_id, user_data,msg_state,enq_timestamp,deq_timestamp from aq$demo_topic;

Output:

Subscribe messages from the topic 🔗

--- config: theme: 'neutral' --- flowchart LR A("Publisher") --> db[(demo_topic)] db --> B(subscriber OMQ)

Let’s use the subscriber created in the earlier step to extract the message from the topic.

DECLARE
    dequeue_options    dbms_aq.dequeue_options_t;
    message_properties dbms_aq.message_properties_t;
    message            sys.aq$_jms_message;
    message_handle     RAW(16);
    message_text       VARCHAR2(100);
BEGIN
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
dequeue_options.consumer_name := 'omq';
    dbms_aq.dequeue(queue_name => 'demo_topic', dequeue_options => dequeue_options, message_properties => message_properties, payload => message
    , msgid => message_handle);

    message.get_text(message_text);
    dbms_output.put_line(message_text);
END;

Output :

hello world
    
PL/SQL procedure successfully completed.

This is a simple example to dequeue a message from TxEventQ. Let’s run the below query to know the status of the message.

select msg_id, user_data,msg_state,enq_timestamp,deq_timestamp from aq$demo_topic;

The state now changed from READY to PROCESSED.

This is a simple walkthrough to configure and use TxEventQ in Oracle Database 23c with the help of PL/SQL.

Please share your valuable feedback! 😄😄😄

References: 🔗

  1. https://www.oracle.com/database/free/download/
  2. https://docs.oracle.com/en/database/oracle/oracle-database/23/arpls/DBMS_AQADM.html
  3. https://docs.oracle.com/en/database/oracle/oracle-database/21/adque/Programmatic_Interfaces_Sharded_Queues.html

Link copied!

Stats


Total Posts: 33

Total Categories: 7

Recently Published:
Logging within DataWeave Script