Chapter 3. The Spring JMS Input and Output Adapters

This chapter discusses the input and output adapters for JMS based on the Spring JmsTemplate technology. For more information on Spring, and the latest version of Spring, please visit http://www.springframework.org

3.1. Introduction

Here are the steps to use the adapters:

  1. Configure an Esper engine instance to use a SpringContextLoader for loading input and output adapters, and point it to a Spring JmsTemplate configuration file.

  2. Create a Spring JmsTemplate configuration file for your JMS provider and add all your input and output adapter entries in the same file.

  3. For receiving events from a JMS destination into an engine (input adapter):

    1. List the destination and un-marshalling class in the Spring configuration.

    2. Create EPL statements using the event type name matching the event objects or the Map-event type names received.

  4. For sending events to a JMS destination (output adapter):

    1. Use the insert-into syntax naming the stream to insert-into using the same name as listed in the Spring configuration file

    2. Configure the Map event type of the stream in the engine configuration

In summary the Spring JMS input adapter performs the following functions:

  • Initialize from a given Spring configuration file in classpath or from a filename. The Spring configuration file sets all JMS parameters such as JMS connection factory, destination and listener pools.

  • Attach to a JMS destination and listen to messages using the Spring class org.springframework.jms.core.JmsTemplate

  • Unmarshal a JMS message and send into the configured engine instance

The Spring JMS output adapter can:

  • Initialize from a given Spring configuration file in classpath or from a filename, and attach to a JMS destination

  • Act as a listener to one or more named streams populated via insert-into syntax by EPL statements

  • Marshal events generated by a stream into a JMS message, and send to the given destination

3.2. Engine Configuration

The Spring JMS input and output adapters are configured as part of the Esper engine configuration. EsperIO supplies a SpringContextLoader class that loads a Spring configuration file which in turn configures the JMS input and output adapters. List the SpringContextLoader class as an adapter loader in the Esper configuration file as the below example shows. The configuration API can alternatively be used to configure one or more adapter loaders.

<esper-configuration>

  <!-- Sample configuration for an input/output adapter loader -->
  <plugin-loader name="MyLoader" class-name="com.espertech.esperio.SpringContextLoader">
    <!-- SpringApplicationContext translates into Spring ClassPathXmlApplicationContext 
           or FileSystemXmlApplicationContext. Only one app-context of a sort can be used. 
           When both attributes are used classpath and file, classpath prevails -->
    <init-arg name="classpath-app-context" value="spring\jms-spring.xml" />
    <init-arg name="file-app-context" value="spring\jms-spring.xml" />
  </plugin-loader>

</esper-configuration>

The loader loads the Spring configuration file from classpath via the classpath-app-context configuration, or from a file via file-app-context.

3.3. Input Adapter

3.3.1. Spring Configuration

The Spring configuration file must list input and output adapters to be initialized by SpringContextLoader upon engine initialization. Please refer to your JMS provider documentation, and the Spring framework documentation on help to configure your specific JMS provider via Spring.

The next XML snippet shows a complete sample configuration for an input adapter. The sample includes the JMS configuration for an Apache ActiveMQ JMS provider.

<!-- Spring Application Context -->
<beans default-destroy-method="destroy">

  <!-- JMS ActiveMQ Connection Factory -->
  <bean id="jmsActiveMQFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
      </bean>
    </property>
  </bean>

  <!--  ActiveMQ destination to use  by default -->
  <bean id="defaultDestination"
        class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="ESPER.QUEUE"/>
  </bean>

  <!--  Spring JMS Template for ActiveMQ -->
  <bean id="jmsActiveMQTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
      <ref bean="jmsActiveMQFactory"/>
    </property>
    <property name="defaultDestination">
      <ref bean="defaultDestination"/>
    </property>
  </bean>

  <!-- Provides listener threads -->
  <bean id="listenerContainer" 
              class="org.springframework.jms.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="jmsActiveMQFactory"/>
    <property name="destination" ref="defaultDestination"/>
    <property name="messageListener" ref="jmsInputAdapter"/>
  </bean>

  <!-- Default unmarshaller -->
  <bean id="jmsMessageUnmarshaller" 
              class="com.espertech.esperio.jms.JMSDefaultAnyMessageUnmarshaller"/>

  <!-- Input adapter -->
  <bean id="jmsInputAdapter" class="com.espertech.esperio.jms.SpringJMSTemplateInputAdapter">
    <property name="jmsTemplate">
      <ref bean="jmsActiveMQTemplate"/>
    </property>
    <property name="jmsMessageUnmarshaller">
      <ref bean="jmsMessageUnmarshaller"/>
    </property>
  </bean>

</beans>

This input adapter attaches to the JMS destination ESPER.QUEUE at an Apache MQ broker available at port tcp://localhost:61616. It configures an un-marshalling class as discussed next.

3.3.2. JMS Message Unmarshalling

EsperIO provides a class for unmarshaling JMS message instances into events for processing by an engine in the class JMSDefaultAnyMessageUnmarshaller. The class unmarshals as follows:

  • If the received Message is of type javax.xml.MapMessage, extract the event type name out of the message and send to the engine via sendEvent(name, Map)

  • If the received Message is of type javax.xml.ObjectMessage, extract the Serializable out of the message and send to the engine via sendEvent(Object)

  • Else the un-marshaller outputs a warning and ignores the message

The unmarshaller must be made aware of the event type of events within MapMessage messages. This is achieved by the client application setting a well-defined property on the message: InputAdapter.ESPERIO_MAP_EVENT_TYPE. An example code snippet is:

MapMessage mapMessage = jmsSession.createMapMessage();
mapMessage.setObject(InputAdapter.ESPERIO_MAP_EVENT_TYPE, "MyInputEvent");

3.4. Output Adapter

3.4.1. Spring Configuration

The Spring configuration file lists all input and output adapters in one file. The SpringContextLoader upon engine initialization starts all input and output adapters.

The next XML snippet shows a complete sample configuration of an output adapter. Please check with your JMS provider for the appropriate Spring class names and settings. Note that the input and output adapter Spring configurations can be in the same file.

<!-- Application Context -->
<beans default-destroy-method="destroy">

  <!-- JMS ActiveMQ Connection Factory -->
  <bean id="jmsActiveMQFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
      </bean>
    </property>
  </bean>

  <!--  ActiveMQ destination to use  by default -->
  <bean id="defaultDestination"
        class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="ESPER.QUEUE"/>
  </bean>

  <!--  Spring JMS Template for ActiveMQ -->
  <bean id="jmsActiveMQTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
      <ref bean="jmsActiveMQFactory"/>
    </property>
    <property name="defaultDestination">
      <ref bean="defaultDestination"/>
    </property>
    <property name="receiveTimeout">
      <value>30000</value>
    </property>
  </bean>

  <!--  Marshaller marshals events into map messages -->
  <bean id="jmsMessageMarshaller" class="com.espertech.esperio.jms.JMSDefaultMapMessageMarshaller"/>
  <bean id="myCustomMarshaller" class="com.espertech.esperio.jms.JMSDefaultMapMessageMarshaller"/>

  <!--  Output adapter puts it all together -->
  <bean id="jmsOutputAdapter" class="com.espertech.esperio.jms.SpringJMSTemplateOutputAdapter">
    <property name="jmsTemplate">
      <ref bean="jmsActiveMQTemplate"/>
    </property>
    <property name="subscriptionMap">
      <map>
        <entry>
          <key><idref local="subscriptionOne"/></key>
          <ref bean="subscriptionOne"/>
        </entry>
        <entry>
          <key><idref local="subscriptionTwo"/></key>
          <ref bean="subscriptionTwo"/>
        </entry>
      </map>
    </property>
    <property name="jmsMessageMarshaller">
      <ref bean="jmsMessageMarshaller"/>
    </property>
  </bean>

  <bean id="subscriptionOne" class="com.espertech.esperio.jms.JMSSubscription">
    <property name="eventTypeName" value="MyOutputStream"/>
  </bean>

  <bean id="subscriptionTwo" class="com.espertech.esperio.jms.JMSSubscription">
    <property name="eventTypeName" value="MyOtherOutputStream"/>
    <property name="jmsMessageMarshaller">
      <ref bean="myCustomMarshaller"/>
    </property>
  </bean>

</beans>

3.4.2. JMS Message Marshalling

EsperIO provides a marshal implementation in the class JMSDefaultMapMessageMarshaller. This marshaller constructs a JMS MapMessage from any event received by copying event properties into the name-value pairs of the message. The configuration file makes it easy to configure a custom marshaller that adheres to the com.espertech.esperio.jms.JMSMessageMarshaller interface.

Note that this marshaller uses javax.jms.MapMessage name-value pairs and not general javax.jms.Message properties. This means when you'll read the event properties back from the JMS MapMessage, you will have to use the javax.jms.MapMessage.getObject(...) method.

The SpringJMSTemplateOutputAdapter is configured with a list of subscription instances of type JMSSubscription as the sample configuration shows. Each subscription defines an event type name that must be configured and used in the insert-into syntax of a statement.

To connect the Spring JMS output adapter and the EPL statements producing events, use the insert-into syntax to direct events for output. Here is a sample statement that sends events into MyOutputStream:

insert into MyOutputStream select assetId, zone from RFIDEvent

The type MyOutputStream must be known to an engine instance. The output adapter requires the name to be configured with the Engine instance, e.g.:

<esper-configuration>
  <event-type name="MyOutputStream">
    <java-util-map>
      <map-property name="assetId" class="String"/>
      <map-property name="zone" class="int"/>
    </java-util-map>
  </event-type>
</esper-configuration>

© 2011 EsperTech Inc. All Rights Reserved