Sunday, April 20, 2014

Using Oracle AQ with Oracle SOA 11g & OSB 11g

Oracle Advanced Queuing(AQ) is Oracle Database message queuing mechanism which acts as the queuing infrastructure.  With AQ the different applications can communicate and exchange the message via Oracle database queues.  Oracle AQ provides the API to enqueue and dequeue the messages to and from the queues in Oracle database.

Oracel AQ is the integral part of Oracle database already.  In this tutorial Oracle XE 10g is used.   Oracle AQ is available from two built-in PL/SQL packages in Oracle database:
 DBMS_AQADM and DBMS_AQ.

DBMS_AQADM
 package provides PL/SQL procedures to manage the AQ configuration and administration.  Some basic functionality in DBMS_AQADM are like: Create the queue table, create the queue, start the queue, stop queue.

DBMS_AQ
 package provides the interface to other applications to AQ.  DEQUEUE and ENQUEUE are two procedures for enqeuing and deqeuing the AQ queue.

After one AQ queue is created and started the application such as PL/SQL procedures can enqueue the message  into the queue and other application such as one SOA component can dequeue the message from this queue.

The below are the steps showing how one AQ queue is created from the scratch and how a message is enqueued in the queue and etc.  After these the steps of creating one SOA component to dequeue the message from AQ queue.


Setup AQ queue in Oracle XE 10g


There are several steps in setting up one AQ queue in Oracle database:


1.     Create one user in database.  This user will be granted with the roles: AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE.  Log into Oracle XE 10g as the sysdba and then create one user called EVENT_USER and then grant certain privileges to this user.

sqlplus sys@localhost:1521/xe as sysdba
create user EVENT_USER identified by oracle;
grant create session, resource, AQ_ADMINISTRATOR_ROLE,AQ_USER_ROLE to EVENT_USER;
grant execute on DBMS_AQADM to EVENT_USER;
grant execute on DBMS_AQ to EVENT_USER;

2.     Create a queue table.  Before one type called invoice_type is created.  This type is used as the payload type of the message in AQ queue.   


create or replace type invoice_type as object(
      invoice_id number,
      invoice_number varchar2(60),
      vendor_id number
);
/

exec DBMS_AQADM.CREATE_QUEUE_TABLE ( 
              queue_table => 'EVENT_USER.invoice_queue_table', 
              queue_payload_type => 'EVENT_USER.invoice_type');

SELECT queue_table,type,object_type,recipients FROM USER_QUEUE_TABLES;
3.     After the queue table is created create a queue
exec DBMS_AQADM.CREATE_QUEUE (
          queue_name =>'EVENT_USER.invoice_queue',              
          queue_table=>'EVENT_USER.invoice_queue_table'); 

select name, queue_table, queue_type from user_queues;
After the execution of above PL/SQL scripts some database objects are created for this AQ queue as shown below. 



4.     Last step is to start the queue.  It can be done by executing the below procedure from DBMS_AQADM.
exec DBMS_AQADM.START_QUEUE('EVENT_USER.invoice_queue');


Enqueue AQ queue with PL/SQL package
Now the queue is ready to enqueue a message in the AQ queue just created and started. In this example one PL/SQL procedure from DBMS_AQ package is used to enqueue the AQ queue.

DECLARE
  enqueue_options    dbms_aq.enqueue_options_t;
  message_properties       dbms_aq.message_properties_t;
  message_handle     RAW(16);
  message           invoice_type ;
  message_id         NUMBER;
BEGIN

 message := invoice_type (10, 'ABCD123', 80);
 enqueue_options.VISIBILITY := DBMS_AQ.ON_COMMIT;
  enqueue_options.SEQUENCE_DEVIATION := null;
  message_properties.EXPIRATION := DBMS_AQ.NEVER;

    DBMS_AQ.ENQUEUE (
    queue_name => 'invoice_queue',
    enqueue_options => enqueue_options,
    message_properties => message_properties,
    payload => message,
    msgid => message_handle);
    
  COMMIT;
END;
/

The message just enqueued can be browsed using the below query:
SELECT count(*) FROM aq$invoice_queue_table

SELECT user_data FROM   aq$invoice_queue_table;


Consume the message in AQ queue from Oracle SOA component
Oracle SOA component(BPEL and Mediator) can  consume the message from AQ queue(dequeue) or produce the message into the queue(enqueue). It is done by using Oracle SOA uses JCA adapter for AQ.

In order to use AQ adapter some configurations need to be done.
1.     Create the data source in Weblogic console via Domain Structure->Services->Data Sources.  
This data source will point to the Oracle database where AQ queue resides on.  In this example it is Oracle XE 10g running on local machine. Here AQEvent data source is created and its JNDI name is set as jdbc/AQEvent



2.     Create AQAdpater instance in Weblogic console via Domain Structure->Deployments->Deployments->AQAdapter.  
An new AQ adapter instance is created as eis/AQ/LocalEventConnection whose XADataSourceName is set as: jdbc/AQEvent.



3.     Use AQAdapter in JDeveloper
Here the AQAdapter will be used as the service to consume the message from the AQ queue specified in adapter jca file.  This jca file also specifies the AQ connection factory which is configured as eis/AQ/LocalEventConnection in the above step.


<adapter-config adapter="AQ Adapter" name="AQEventService" wsdllocation="AQEventService.wsdl" xmlns="http://platform.integration.oracle/blocks/adapter/fw/metadata">
  
  <connection-factory location="eis/AQ/LocalEventConnectionen" uiconnectionname="LocalEventConnectionen">
  <endpoint-activation operation="Dequeue" porttype="Dequeue_ptt">
    <activation-spec classname="oracle.tip.adapter.aq.inbound.AQDequeueActivationSpec">
      <property name="SchemaValidation" value="false">
      <property name="QueueName" value="INVOICE_QUEUE">
      <property name="DatabaseSchema" value="EVENT_USER">
    </property></property></property></activation-spec>
  </endpoint-activation>

</connection-factory>
</adapter-config>



After the SOA project is deployed to the server, the AQAdapter will start to poll the message in AQInvoice queue.


Consume the message in AQ queue from OSB
In OSB JCA adapter is also used to poll the message in AQ queue.  Here the AQ JCA used in SOA will be used to generate one proxy service.   Also assume that the same data source and AQAdapter instance are configured in OSB server.

Create one OSB project as AQTestProject.
Copy the below files from JDeveloper to the OSB project:
                  AQEventService_aq.jca
                  AQEventService.wsdl
                  xsd/EVENT_USER_INVOICE_TYPE.xsd



From .jca file to generate the OSB proxy service.


The generated proxy service will be as the below:




Design the message flow as below:


After the OSB project is deployed on the server it will start to consume the message in AQ.