Tuesday, November 10, 2015

Creating Queue, Topic and Durable Topic Subscribers using ESB Proxy Services


WSO2 ESB Proxy service is a service that receives messages and capable of processing the received messages in a way so that a particular objective can be achieved prior to forwarding them to a given endpoint. Out of the numerous functionalities ESB possesses, the ability to carry messages in different formats using transports is prominent. JMS Transport is one of the many transport types supported by ESB. This implementation requires an active JMS server to be able to send and receive messages. WSO2 Message Broker comes in to the picture to support this.

Message Broker (MB) is capable of receiving messages from publishers and sending messages to subscribers registered for queues and topics. The subscribers can be of several types such as queue, topic, durable, shared.

In this tutorial, we will discuss how we can use proxy services to create publishers and subscribers of different types to pub sub to the message broker.

I am using,


Setting up Message Broker

1. Download and install MB according to the instructions mentioned in Getting Started.

2. Before starting the MB server, change the port offset so that you can start two WSO2 products simultaneously in the same environment without any port conflicts.

Go to <MB_HOME>/repository/conf/carbon.xml and change the port offset as follows.

<Ports>
   <!-- Ports offset. This entry will set the value of the ports defined below to
 the define value + Offset.
 e.g. Offset=2 and HTTPS port=9443 will set the effective HTTPS port to 9445
 -->
   <Offset>1</Offset>

Setting up ESB

1. Go to <ESB_HOME>/repository/conf/axis2/axis2.xml and enable the JMS Transport of ESB to communicate with MB.

<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">        
  <parameter name="myTopicConnectionFactory" locked="false">
   <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>            
   <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>         
   <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>            
   <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>        
  </parameter>

  <parameter name="myQueueConnectionFactory" locked="false">            
   <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>           
   <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>            
   <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>           
   <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>        
  </parameter>

 .... 
 
</transportReceiver>

2. Also, uncomment the JMS transport sender in the same file.

<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>
 

3. Navigate to <ESB_HOME>/repository/conf/JNDI.proerties file and configure the file as follows.

# register some connection factories
# connectionfactory.[jndiname] = [ConnectionURL]

connectionfactory.QueueConnectionFactory = amqp://admin:admin@carbon/carbon?brokerlist='tcp://localhost:5673'
connectionfactory.TopicConnectionFactory = amqp://admin:admin@carbon/carbon?brokerlist='tcp://localhost:5673'

# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.WeatherForecast = WeatherForecast

# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
topic.MyTopic = News.Sports
 

Note: 
  • The connectionfactory.jndiname should be same the name given in the axis2.xml file.
  • The port used to connect to MB (5673) depends on the port offset we configure.

4. Copy the following jar files from the <MB_HOME>/clent-lib folder to the <ESB_HOME>/repository/components/lib folder
  • andes-client-2.6.2.jar
  • geronimo-jms_1.1_spec-1.1.0.wso2v1.jar
  • org.wso2.securevault-1.0.0-wso2v2.jar

Now, start ESB.


Setting up JMS Consumers and Publishers

1. Navigate to ESB and click on Proxy Services under Services
2. Select Custom Proxy


There are two ways you can fill the information required to create the proxy.
     1. Use the Design view and fill the required fields
     2. Use the Source view and write the proxy

Following are sample proxies to create queue, topic and durable topic subscribers which can be customized according to your requirements.

Queue Subscriber

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="QueueSubscriber"
       transports="jms"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <property name="OUT_ONLY" value="true"/>
         <log level="full">
            <property name="QueueSubscriber"
                      value="==REACHED_QueueSubscriber=="/>
         </log>
         <drop/>
      </inSequence>
      <outSequence>
         <send/>
      </outSequence>
   </target>
   <parameter name="transport.jms.DestinationType">queue</parameter>
   <parameter name="transport.jms.Destination">WeatherForecast</parameter>
   <parameter name="transport.jms.ContentType">application/xml</parameter>
   <parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>
   <description/>
</proxy>

Temporary Topic Subscriber

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="TopicSubscriber"
       transports="jms"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <property name="OUT_ONLY" value="true"/>
         <log level="full">
            <property name="TopicSubscriber"
                      value="==REACHED_TopicSubscriber=="/>
         </log>
         <drop/>
      </inSequence>
      <outSequence>
         <send/>
      </outSequence>
   </target>
   <parameter name="transport.jms.DestinationType">topic</parameter>
   <parameter name="transport.jms.Destination">MyTopic</parameter>
   <parameter name="transport.jms.ContentType">application/xml</parameter>
   <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
   <description/>
</proxy>

Durable Topic Subscriber

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="DurableTopicSubscriber"
       transports="jms"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <property name="OUT_ONLY" value="true"/>
         <log level="full">
            <property name="DurableTopicSubscriber"
                      value="==REACHED_DurableTopicSubscriber=="/>
         </log>
         <drop/>
      </inSequence>
      <outSequence>
         <send/>
      </outSequence>
   </target>
   <parameter name="transport.jms.DestinationType">topic</parameter>
   <parameter name="transport.jms.DurableSubscriberName">SubscriberName</parameter>
   <parameter name="transport.jms.Destination">MyTopic</parameter>
   <parameter name="transport.jms.ContentType">application/xml</parameter>
   <parameter name="transport.jms.SubscriptionDurable">true</parameter>
   <parameter name="transport.jms.DurableSubscriberClientID">SubscriberClientID</parameter>
   <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter>
   <parameter name="transport.jms.CacheLevel">consumer</parameter>
   <description/>
</proxy>

Queue Publisher

<?xml version="1.0" encoding="UTF-8"?>

<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="Publisher1"
       transports="http"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <log>
            <property name="Publisher1"
                      value="==Message=="/>
         </log>
         <property name="OUT_ONLY" value="true"/>
         <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
         <send>
            <endpoint name="queueEndpoint">
               <address uri="jms:/WeatherForecast?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=repository/conf/jndi.properties&amp;transport.jms.DestinationType=queue"/>
            </endpoint>
         </send>
      </inSequence>
      <outSequence/>
      <faultSequence/>
   </target>
   <description/>
</proxy>               

Topic Publisher

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="Publisher2"
       transports="http"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <log>
            <property name="Publisher2"
                      value="==Message=="/>
         </log>
         <property name="OUT_ONLY" value="true"/>
         <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
         <send>
            <endpoint name="topicEndpoint">
               <address uri="jms:/MyTopic?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=repository/conf/jndi.properties&amp;transport.jms.DestinationType=topic"/>
            </endpoint>
         </send>
      </inSequence>
      <outSequence/>
      <faultSequence/>
   </target>
   <description/>
</proxy>

Once you have successfully deployed the proxy services, you can verify them by navigating to the management console of Message Broker. You will see a queue namely WeatherForecast, topics namely News and Sports and three active subscribers in broker side. Now, you can send messages to the publisher proxies and confirm that they are consumed by the subscribers.



1 comment:

  1. ford titanium - TI
    The titanium titanium granite countertops is a ceramic core of the base of the skin of ion chrome vs titanium a titanium dental chile. It can be ford focus titanium hatchback used for all kinds of different applications. It titanium nose hoop is one of the most

    ReplyDelete