Esper - Java Event Stream Processor

Esper Reference Documentation

1.10.0


Table of Contents

Preface
1. Technology Overview
1.1. Introduction to CEP and event stream analysis
1.2. CEP and relational databases
1.3. The Esper engine for CEP
1.4. Required 3rd Party Libraries
2. Configuration
2.1. Programmatic Configuration
2.2. Configuration via XML File
2.3. XML Configuration File
2.4. Configuration Items
2.4.1. Events represented by Java Classes
2.4.1.1. Event type alias to Java class mapping
2.4.1.2. Non-JavaBean and Legacy Java Event Classes
2.4.1.3. Specifying Event Properties for Java Classes
2.4.1.4. Turning off Code Generation
2.4.1.5. Case Sensitivity and Property Names
2.4.2. Events represented by java.util.Map
2.4.3. Events represented by org.w3c.dom.Node
2.4.3.1. Schema Resource
2.4.3.2. XPath Property
2.4.4. Class and package imports
2.4.5. Relational Database Access
2.4.5.1. Connections obtained via DataSource
2.4.5.2. Connections obtained via DriverManager
2.4.5.3. Connections-level settings
2.4.5.4. Connections lifecycle settings
2.4.5.5. Cache settings
2.4.6. Engine Settings related to Concurrency and Threading
2.4.6.1. Preserving the order of events delivered to listeners
2.4.6.2. Preserving the order of events for insert-into streams
2.4.6.3. Internal Timer Settings
2.4.7. Engine Settings related to Event Metadata
2.4.7.1. Java Class Property Names and Case Sensitivity
2.4.8. Engine Settings related to View Resources
2.4.8.1. Sharing View Resources between Statements
2.4.9. Engine Settings related to Logging
2.4.9.1. Execution Path Debug Logging
3. API Reference
3.1. API Overview
3.2. Engine Instances
3.3. The Administrative Interface
3.3.1. Creating Statements
3.3.2. Adding Listeners
3.3.3. Using Iterators
3.3.4. Managing Statements
3.3.5. Runtime Engine Configuration
3.4. The Runtime Interface
3.5. Time-Keeping Events
3.6. Events Received from the Engine
3.7. Engine Threading and Concurrency
4. Understanding the Output Model
4.1. Introduction
4.2. Insert Stream
4.3. Insert and Remove Stream
4.4. Filters and Where-clauses
4.5. Time Windows
4.5.1. Time Window
4.5.2. Time Batch
4.6. Aggregation and Grouping
4.6.1. Insert and Remove Stream
4.6.2. Output for Event Batches
4.6.2.1. Un-aggregated and Un-grouped
4.6.2.2. Fully Aggregated and Un-grouped
4.6.2.3. Aggregated and Un-Grouped
4.6.2.4. Fully Aggregated and Grouped
4.6.2.5. Aggregated and Grouped
4.7. EventBean Query Results
5. Event Representations
5.1. Event Underlying Java Objects
5.2. Event Properties
5.3. Plain Java Object Events
5.3.1. Java Object Event Properties
5.4. java.util.Map Events
5.5. org.w3c.dom.Node XML Events
6. EQL Reference
6.1. EQL Introduction
6.2. EQL Syntax
6.2.1. Specifying Time Periods
6.3. Choosing Event Properties And Events: the Select Clause
6.3.1. Choosing all event properties: select *
6.3.2. Choosing specific event properties
6.3.3. Expressions
6.3.4. Renaming event properties
6.3.5. Selecting istream and rstream events
6.4. Specifying Event Streams : the From Clause
6.4.1. Filter-based event streams
6.4.1.1. Specifying an event type
6.4.1.2. Specifying filter criteria
6.4.1.3. Filtering Ranges
6.4.1.4. Filtering Sets of Values
6.4.1.5. Filter Limitations
6.4.2. Pattern-based event streams
6.4.3. Specifying views
6.5. Specifying Search Conditions: the Where Clause
6.6. Aggregates and grouping: the Group-by Clause and the Having Clause
6.6.1. Using aggregate functions
6.6.2. Organizing statement results into groups: the Group-by clause
6.6.3. Selecting groups of events: the Having clause
6.6.4. How the stream filter, Where, Group By and Having clauses interact
6.6.5. Comparing the Group By clause and the std:groupby view
6.7. Stabilizing and Limiting Output: the Output Clause
6.7.1. Output Clause Options
6.7.2. Group By, Having and Output clause interaction
6.8. Sorting Output: the Order By Clause
6.9. Merging Streams and Continuous Insertion: the Insert Into Clause
6.10. Joining Event Streams
6.11. Outer Joins
6.12. Subqueries
6.12.1. The 'exists' keyword
6.12.2. The 'in' keyword
6.13. Joining Relational Data via SQL
6.13.1. Joining SQL Query Results
6.13.2. Outer Joins With SQL Queries
6.13.3. Using Patterns to Request (Poll) Data
6.13.4. JDBC Implementation Overview
6.14. Single-row Function Reference
6.14.1. The Min and Max Functions
6.14.2. The Coalesce Function
6.14.3. The Case Control Flow Function
6.14.4. The Previous Function
6.14.4.1. Previous Event per Group
6.14.4.2. Restrictions
6.14.4.3. Comparison to the prior Function
6.14.5. The Prior Function
6.15. Operator Reference
6.15.1. Arithmatic Operators
6.15.2. Logical And Comparsion Operators
6.15.3. Concatenation Operators
6.15.4. Binary Operators
6.15.5. Array Definition Operator
6.15.6. The 'in' Keyword
6.15.7. The 'between' Keyword
6.15.8. The 'like' Keyword
6.15.9. The 'regexp' Keyword
6.16. Built-in views
6.16.1. Window views
6.16.1.1. Length window (win:length)
6.16.1.2. Length window batch (win:length_batch)
6.16.1.3. Time window (win:time)
6.16.1.4. Externally-timed window (win:ext_timed)
6.16.1.5. Time window batch (win:time_batch)
6.16.2. Standard view set
6.16.2.1. Unique (std:unique)
6.16.2.2. Group By (std:groupby)
6.16.2.3. Size (std:size)
6.16.2.4. Last (std:lastevent)
6.16.3. Statistics views
6.16.3.1. Univariate statistics (stat:uni)
6.16.3.2. Regression (stat:linest)
6.16.3.3. Correlation (stat:correl)
6.16.3.4. Weighted average (stat:weighted_avg)
6.16.3.5. Multi-dimensional statistics (stat:cube)
6.16.4. Extension View Set
6.16.4.1. Sorted Window View (ext:sort)
6.17. User-Defined Functions
7. Event Pattern Reference
7.1. Event Pattern Overview
7.2. How to use Patterns
7.2.1. Pattern Syntax
7.2.2. Subscribing to Pattern Events
7.2.3. Pulling Data from Patterns
7.3. Operator Precedence
7.4. Filter Expressions In Patterns
7.5. Pattern Operators
7.5.1. Every
7.5.1.1. Every Operator Example
7.5.1.2. Sensor Example
7.5.2. And
7.5.3. Or
7.5.4. Not
7.5.5. Followed-by
7.6. Pattern Guards
7.6.1. timer:within
7.7. Pattern Observers
7.7.1. timer:interval
7.7.2. timer:at
8. Extension and Plug-in
8.1. Overview
8.2. Custom View Implementation
8.2.1. Implementing a View Factory
8.2.2. Implementing a View
8.2.3. Configuring View Namespace and Name
8.3. Custom Aggregation Functions
8.3.1. Implementing an Aggregation Function
8.3.2. Configuring Aggregation Function Name
8.4. Custom Pattern Guard
8.4.1. Implementing a Guard Factory
8.4.2. Implementing a Guard Class
8.4.3. Configuring Guard Namespace and Name
8.5. Custom Pattern Observer
8.5.1. Implementing an Observer Factory
8.5.2. Implementing an Observer Class
8.5.3. Configuring Observer Namespace and Name
9. Examples, Tutorials, Case Studies
9.1. Examples Overview
9.2. Market Data Feed Monitor
9.2.1. Input Events
9.2.2. Computing Rates Per Feed
9.2.3. Detecting a Fall-off
9.2.4. Event generator
9.3. Transaction 3-Event Challenge
9.3.1. The Events
9.3.2. Combined event
9.3.3. Real time summary data
9.3.4. Find problems
9.3.5. Event generator
9.4. J2EE Self-Service Terminal Management
9.4.1. Events
9.4.2. Detecting Customer Check-in Issues
9.4.3. Absence of Status Events
9.4.4. Activity Summary Data
9.4.5. Sample Application for J2EE Application Server
9.4.5.1. Running the Example
9.4.5.2. Building the Example
9.4.5.3. Running the Event Simulator and Receiver
9.5. Assets Moving Across Zones - An RFID Example
9.6. AutoID RFID Reader generating XML documents
9.7. StockTicker
9.8. MatchMaker
9.9. QualityOfService
9.10. LinearRoad
9.11. StockTick RSI
10. Performance
10.1. Performance Results
10.2. Performance Tips
10.2.1. Understand how to tune your Java virtual machine
10.2.2. Compare Esper to other solutions
10.2.3. Select the underlying event rather than individual fields
10.2.4. Prefer stream-level filtering over post-data-window filtering
10.2.5. Reduce the use of arithmetic in expressions
10.2.6. Consider using EventPropertyGetter for fast access to event properties
10.2.7. Consider casting the underlying event
10.2.8. Turn off logging
10.3. Using the performance kit
10.3.1. How to use the performance kit
10.3.2. How we use the performance kit
11. References
11.1. Reference List

Preface

Analyzing and reacting to information in real-time oftentimes requires the development of custom applications. Typically these applications must obtain the data to analyze, filter data, derive information and then indicate this information through some form of presentation or communication. Data may arrive with high frequency requiring high throughput processing. And applications may need to be flexible and react to changes in requirements while the data is processed. Esper is an event stream processor that aims to enable a short development cycle from inception to production for these types of applications.

If you are new to Esper, please follow these steps:

  1. Read the tutorials, case studies and solution patterns available on the Esper public web site at http://esper.codehaus.org

  2. Read Section 1.1, “Introduction to CEP and event stream analysis” if you are new to CEP and ESP (complex event processing, event stream processing)

  3. Read Section 6.1, “EQL Introduction” for an introduction to event stream processing via EQL

  4. Read Section 7.1, “Event Pattern Overview” for an overview over event patterns

  5. Read Chapter 4, Understanding the Output Model to gain insight into EQL continuous query results

  6. Then glance over the examples Section 9.1, “Examples Overview”

  7. Finally to test drive Esper performance, read Chapter 10, Performance

Chapter 1. Technology Overview

1.1. Introduction to CEP and event stream analysis

The Esper engine has been developed to address the requirements of applications that analyze and react to events. Some typical examples of applications are:

  • Business process management and automation (process monitoring, BAM, reporting exceptions)

  • Finance (algorithmic trading, fraud detection, risk management)

  • Network and application monitoring (intrusion detection, SLA monitoring)

  • Sensor network applications (RFID reading, scheduling and control of fabrication lines, air traffic)

What these applications have in common is the requirement to process events (or messages) in real-time or near real-time. This is sometimes referred to as complex event processing (CEP) and event stream analysis. Key considerations for these types of applications are throughput, latency and the complexity of the logic required.

  • High throughput - applications that process large volumes of messages (between 1,000 to 100k messages per second)

  • Low latency - applications that react in real-time to conditions that occur (from a few milliseconds to a few seconds)

  • Complex computations - applications that detect patterns among events (event correlation), filter events, aggregate time or length windows of events, join event streams, trigger based on absence of events etc.

The Esper engine was designed to make it easier to build and extend CEP applications.

1.2. CEP and relational databases

Relational databases and the standard query language (SQL) are designed for applications in which most data is fairly static and complex queries are less frequent. Also, most databases store all data on disks (except for in-memory databases) and are therefore optimized for disk access.

To retrieve data from a database an application must issue a query. If an application need the data 10 times per second it must fire the query 10 times per second. This does not scale well to hundreds or thousands of queries per second.

Database triggers can be used to fire in response to database update events. However database triggers tend to be slow and often cannot easily perform complex condition checking and implement logic to react.

In-memory databases may be better suited to CEP applications then traditional relational database as they generally have good query performance. Yet they are not optimized to provide immediate, real-time query results required for CEP and event stream analysis.

1.3. The Esper engine for CEP

The Esper engine works a bit like a database turned upside-down. Instead of storing the data and running queries against stored data, the Esper engine allows applications to store queries and run the data through. Response from the Esper engine is real-time when conditions occur that match queries. The execution model is thus continuous rather then only when a query is submitted.

Esper provides two principal methods or mechanisms to process events: event patterns and event stream queries.

Esper offers an event pattern language to specify expression-based event pattern matching. Underlying the pattern matching engine is a state machine implementation. This method of event processing matches expected sequences of presence or absence of events or combinations of events. It includes time-based correlation of events.

Esper also offers event stream queries that address the event stream analysis requirements of CEP applications. Event stream queries provide the windows, aggregation, joining and analysis functions for use with streams of events. These queries are following the EQL syntax. EQL has been designed for similarity with the SQL query language but differs from SQL in its use of views rather then tables. Views represent the different operations needed to structure data in an event stream and to derive data from an event stream.

Esper provides these two methods as alternatives through the same API.

1.4. Required 3rd Party Libraries

Esper requires the following 3rd-party libraries at runtime:

  • ANTLR is the parser generator used for parsing and parse tree walking of the pattern and EQL syntax. Credit goes to Terence Parr at http://www.antlr.org. The ANTLR license is in the lib directory. The library is required for compile-time only.

  • CGLIB is the code generation library for fast method calls. This open source software is under the Apache license. The Apache 2.0 license is in the lib directory.

  • LOG4J and Apache commons logging are logging components. This open source software is under the Apache license. The Apache 2.0 license is in the lib directory.

Esper requires the following 3rd-party libraries at compile-time and for running the test suite:

  • JUnit is a great unit testing framework. Its license has also been placed in the lib directory. The library is required for build-time only.

  • MySQL connector library is used for testing SQL integration and is required for running the automated test suite.

Chapter 2. Configuration

Esper engine configuration is entirely optional. Esper has a very small number of configuration parameters that can be used to simplify event pattern and EQL statements, and to tune the engine behavior to specific requirements. The Esper engine works out-of-the-box without configuration.

2.1. Programmatic Configuration

An instance of net.esper.client.Configuration represents all configuration parameters. The Configuration is used to build an (immutable) EPServiceProvider, which provides the administrative and runtime interfaces for an Esper engine instance.

You may obtain a Configuration instance by instantiating it directly and adding or setting values on it. The Configuration instance is then passed to EPServiceProviderManager to obtain a configured Esper engine.

Configuration configuration = new Configuration();
configuration.addEventTypeAlias("PriceLimit", PriceLimit.class.getName());
configuration.addEventTypeAlias("StockTick", StockTick.class.getName());
configuration.addImport("org.mycompany.mypackage.MyUtility");
configuration.addImport("org.mycompany.util.*");

EPServiceProvider epService = EPServiceProviderManager.getProvider("sample", configuration);

Note that Configuration is meant only as an initialization-time object. The Esper engine represented by an EPServiceProvider is immutable and does not retain any association back to the Configuration.

2.2. Configuration via XML File

An alternative approach to configuration is to specify a configuration in an XML file.

The default name for the XML configuration file is esper.cfg.xml. Esper reads this file from the root of the CLASSPATH as an application resource via the configure method.

Configuration configuration = new Configuration();		
configuration.configure();

The Configuration class can read the XML configuration file from other sources as well. The configure method accepts URL, File and String filename parameters.

Configuration configuration = new Configuration();		
configuration.configure("myengine.esper.cfg.xml");

2.3. XML Configuration File

Here is an example configuration file. The schema for the configuration file can be found in the etc folder and is named esper-configuration-1-0.

<?xml version="1.0" encoding="UTF-8"?>
<esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
           xsi:noNamespaceSchemaLocation="esper-configuration-1-0.xsd">
  <event-type alias="StockTick" class="net.esper.example.stockticker.event.StockTick"/>
  <event-type alias="PriceLimit" class="net.esper.example.stockticker.event.PriceLimit"/>
  <auto-import import-name="org.mycompany.mypackage.MyUtility"/>
  <auto-import import-name="org.mycompany.util.*"/>
</esper-configuration>		

The example above is only a subset of the configuration items available. The next chapters outline the available configuration in greater detail.

2.4. Configuration Items

2.4.1. Events represented by Java Classes

2.4.1.1. Event type alias to Java class mapping

This configuration item can be used to allow event pattern statements and EQL statements to use an event type alias rather then the fully qualified Java class name. Note that Java Interface classes and abstract classes are also supported as event types via the fully qualified Java class name, and an event type alias can also be defined for such classes.

The example pattern statement below first shows a pattern that uses the alias StockTick. The second pattern statement is equivalent but specifies the fully-qualified Java class name.

every StockTick(symbol='IBM')"
every net.esper.example.stockticker.event.StockTick(symbol='IBM')

The event type alias can be listed in the XML configuration file as shown below. The Configuration API can also be used to programatically specify an event type alias, as shown in an earlier code snippet.

<event-type alias="StockTick" class="net.esper.example.stockticker.event.StockTick"/>

2.4.1.2. Non-JavaBean and Legacy Java Event Classes

Esper can process Java classes that provide event properties through other means then through JavaBean-style getter methods. It is not necessary that the method and member variable names in your Java class adhere to the JavaBean convention - any public methods and public member variables can be exposed as event properties via the below configuration.

A Java class can optionally be configured with an accessor style attribute. This attribute instructs the engine how it should expose methods and fields for use as event properties in statements.

Table 2.1. Accessor Styles

Style NameDescription
javabeanAs the default setting, the engine exposes an event property for each public method following the JavaBean getter-method conventions
publicThe engine exposes an event property for each public method and public member variable of the given class
explicitThe engine exposes an event property only for the explicitly configured public methods and public member variables

Using the public setting for the accessor-style attribute instructs the engine to expose an event property for each public method and public member variable of a Java class. The engine assigns event property names of the same name as the name of the method or member variable in the Java class.

For example, assuming the class MyLegacyEvent exposes a method named readValue and a member variable named myField, we can then use properties as shown.

select readValue, myField from MyLegacyEvent

Using the explicit setting for the accessor-style attribute requires that event properties are declared via configuration. This is outlined in the next chapter.

When configuring an engine instance from an XML configuration file, the XML snippet below demonstrates the use of the legacy-type element and the accessor-style attribute.

<event-type alias="MyLegacyEvent" class="com.mycompany.mypackage.MyLegacyEventClass">
  <legacy-type accessor-style="public"/>
</event-type>

When configuring an engine instance via Configuration API, the sample code below shows how to set the accessor style.

Configuration configuration = new Configuration();
ConfigurationEventTypeLegacy legacyDef = new ConfigurationEventTypeLegacy();
legacyDef.setAccessorStyle(ConfigurationEventTypeLegacy.AccessorStyle.PUBLIC);
config.addEventTypeAlias("MyLegacyEvent", MyLegacyEventClass.class.getName(), legacyDef);

EPServiceProvider epService = EPServiceProviderManager.getProvider("sample", configuration);

2.4.1.3. Specifying Event Properties for Java Classes

Sometimes it may be convenient to use event property names in pattern and EQL statements that are backed up by a given public method or member variable (field) in a Java class. And it can be useful to declare multiple event properties that each map to the same method or member variable.

We can configure properties of events via method-property and field-property elements, as the next example shows.

<event-type alias="StockTick" class="net.esper.example.stockticker.event.StockTickEvent">
	<legacy-type accessor-style="javabean" code-generation="enabled">
		<method-property name="price" accessor-method="getCurrentPrice" />
		<field-property name="volume" accessor-field="volumeField" />
	</legacy-type>
</event-type>

The XML configuration snippet above declared an event property named price backed by a getter-method named getCurrentPrice, and a second event property named volume that is backed by a public member variable named volumeField. Thus the price and volume properties can be used in a statement:

select avg(price * volume) from StockTick

As with all configuration options, the API can also be used:

Configuration configuration = new Configuration();
ConfigurationEventTypeLegacy legacyDef = new ConfigurationEventTypeLegacy();
legacyDef.addMethodProperty("price", "getCurrentPrice");
legacyDef.addFieldProperty("volume", "volumeField");
config.addEventTypeAlias("StockTick", StockTickEvent.class.getName(), legacyDef);

2.4.1.4. Turning off Code Generation

Esper employes the CGLIB library for very fast read access to event property values. For certain legacy Java classes it may be desirable to disable the use of this library and instead use Java reflection to obtain event property values from event objects.

In the XML configuration, the optional code-generation attribute in the legacy-type section can be set to disabled as shown next.

<event-type alias="MyLegacyEvent" class="com.mycompany.package.MyLegacyEventClass">
	<legacy-type accessor-style="javabean" code-generation="disabled" />
</event-type>

The sample below shows how to configure this option via the API.

Configuration configuration = new Configuration();
ConfigurationEventTypeLegacy legacyDef = new ConfigurationEventTypeLegacy();
legacyDef.setCodeGeneration(ConfigurationEventTypeLegacy.CodeGeneration.DISABLED);
config.addEventTypeAlias("MyLegacyEvent", MyLegacyEventClass.class.getName(), legacyDef);

2.4.1.5. Case Sensitivity and Property Names

By default the engine resolves Java event properties case sensitive. That is, property names in statements must match JavaBean-convention property names in name and case. This option controls case sensitivity per Java class.

In the configuration XML, the optional property-resolution-style attribute in the legacy-type element can be set to any of these values:

Table 2.2. Property Resolution Case Sensitivity Styles

Style NameDescription
case_sensitive (default)As the default setting, the engine matches property names for the exact name and case only.
case_insensitiveProperties are matched if the names are identical. A case insensitive search is used and will choose the first property that matches the name exactly or the first property that matches case insensitively should no match be found.
distinct_case_insensitiveProperties are matched if the names are identical. A case insensitive search is used and will choose the first property that matches the name exactly case insensitively. If more than one 'name' can be mapped to the property an exception is thrown.

The sample below shows this option in XML configuration, however the setting can also be changed via API:

<event-type alias="MyLegacyEvent" class="com.mycompany.package.MyLegacyEventClass">
  <legacy-type property-resolution-style="case_insensitive"/>
</event-type>

2.4.2. Events represented by java.util.Map

The engine can process java.util.Map events via the sendEvent(Map map, String eventTypeAlias) method on the EPRuntime interface. Entries in the Map represent event properties. Keys must be of type java.util.String for the engine to be able to look up event property names in pattern or EQL statements. Values can be of any type. JavaBean-style objects as values in a Map can also be processed by the engine. Please see the Chapter 5, Event Representations section for details on how to use Map events with the engine.

Via configuration we provide an event type alias name for Map events for use in statements, and the event property names and types enabling the engine to validate properties in statements.

The below snippet of XML configuration configures an event named MyMapEvent.

<event-type alias="MyMapEvent">
  <java-util-map>
    <map-property name="carId" class="int"/>
    <map-property name="carType" class="string"/>
    <map-property name="assembly" class="com.mycompany.Assembly"/>    
  </java-util-map>
</event-type>

This configuration defines the carId property of MyMapEvent events to be of type int, and the carType property to be of type java.util.String. The assembly property of the Map event will contain instances of com.mycompany.Assembly for the engine to query.

The valid list of values for the type definition via the class attribute is:

  • string or java.lang.String

  • char or java.lang.Character

  • byte or java.lang.Byte

  • short or java.lang.Short

  • int or java.lang.Integer

  • long or java.lang.Long

  • float or java.lang.Float

  • double or java.lang.Double

  • boolean or java.lang.Boolean

  • Any fully-qualified Java class name that can be resolved by the engine via Class.forName

You can also use the configuration API to configure Map event types, as the short code snippet below demonstrates.

Properties properties = new Properties();
properties.put("carId", "int");
properties.put("carType", "string");
properties.put("assembly", Assembly.class.getName());

Configuration configuration = new Configuration();
configuration.addEventTypeAlias("MyMapEvent", properties);

Finally, here is a sample EQL statement that uses the configured MyMapEvent map event. This statement uses the chassisTag and numParts properties of Assembly objects in each map.

select carType, assembly.chassisTag, count(assembly.numParts) from MyMapEvent.win:time(60 sec)

2.4.3. Events represented by org.w3c.dom.Node

Via this configuration item the Esper engine can natively process org.w3c.dom.Node instances, i.e. XML document object model (DOM) nodes. Please see the Chapter 5, Event Representations section for details on how to use Node events with the engine.

Esper allows configuring XPath expressions as event properties. You can specify arbitrary XPath functions or expressions and provide a property name by which their result values will be available for use in expressions.

For XML documents that follow an XML schema, Esper can load and interrogate your schema and validate event property names and types against the schema information.

Nested, mapped and indexed event properties are also supported in expressions against org.w3c.dom.Node events. Thus XML trees can conveniently be interrogated using the existing event property syntax for querying JavaBean objects, JavaBean object graphs or java.util.Map events.

In the simplest form, the Esper engine only requires a configuration entry containing the root element name and the event type alias in order to process org.w3c.dom.Node events:

<event-type alias="MyXMLNodeEvent">
  <xml-dom root-element-name="myevent" />
</event-type>

You can also use the configuration API to configure XML event types, as the short example below demonstrates. In fact, all configuration options available through XML configuration can also be provided via setter methods on the ConfigurationEventTypeXMLDOM class.

Configuration configuration = new Configuration();
ConfigurationEventTypeXMLDOM desc = new ConfigurationEventTypeXMLDOM();
desc.setRootElementName("myevent");
desc.addXPathProperty("name1", "/element/@attribute", XPathConstants.STRING);
desc.addXPathProperty("name2", "/element/subelement", XPathConstants.NUMBER);
configuration.addEventTypeAlias("MyXMLNodeEvent", desc);

The next example presents all relevant configuration options in a sample configuration entry.

<event-type alias="AutoIdRFIDEvent">
  <xml-dom root-element-name="Sensor" schema-resource="data/AutoIdPmlCore.xsd" 
       default-namespace="urn:autoid:specification:interchange:PMLCore:xml:schema:1">
    <namespace-prefix prefix="pmlcore" 
       namespace="urn:autoid:specification:interchange:PMLCore:xml:schema:1"/>
    <xpath-property property-name="countTags" 
       xpath="count(/pmlcore:Sensor/pmlcore:Observation/pmlcore:Tag)" type="number"/>
  </xml-dom>
</event-type>

This example configures an event property named countTags whose value is computed by an XPath expression. The namespace prefixes and default namespace are for use with XPath expressions and must also be made known to the engine in order for the engine to compile XPath expressions. Via the schema-resource attribute we instruct the engine to load a schema file.

Here is an example EQL statement using the configured event type named AutoIdRFIDEvent.

select ID, countTags from AutoIdRFIDEvent.win:time(30 sec)

2.4.3.1. Schema Resource

The schema-resource attribute takes a schema resource URL or classpath-relative filename. The engine attempts to resolve the schema resource as an URL. If the schema resource name is not a valid URL, the engine attempts to resolve the resource from classpath via the ClassLoader.getResource method using the thread context class loader. If the name could not be resolved, the engine uses the Configuration class classloader.

By configuring a schema file for the engine to load, the engine performs these additional services:

  • Validates the event properties in a statement, ensuring the event property name matches an attribute or element in the XML

  • Determines the type of the event property allowing event properties to be used in type-sensitive expressions such as expressions involving arithmatic (Note: XPath properties are also typed)

  • Matches event property names to either element names or attributes

If no schema resource is specified, none of the event properties specified in statements are validated at statement creation time and their type defaults to java.lang.String. Also, attributes are not supported if no schema resource is specified and must thus be declared via XPath expression.

2.4.3.2. XPath Property

The xpath-property element adds event properties to the event type that are computed via an XPath expression. In order for the XPath expression to compile, be sure to specify the default-namespace attribute and use the namespace-prefix to declare namespace prefixes.

XPath expression properties are strongly typed. The type attribute allows the following values. These values correspond to those declared by javax.xml.xpath.XPathConstants.

  • number (Note: resolves to a double)

  • string

  • boolean

2.4.4. Class and package imports

Esper allows invocations of static Java library functions as outlined in Section 6.14, “Single-row Function Reference”. This configuration item can be set to allow a partial rather than a fully qualified class name in such invocations. The imports work in the same way as in Java files, so both packages and classes can be imported.

select Math.max(priceOne, PriceTwo)
// via configuration equivalent to
select java.lang.Math.max(priceOne, priceTwo)

Esper auto-imports the following Java library packages if no other configuration is supplied. This list is replaced with any configuration specified in a configuration file or through the API.

  • java.lang.*

  • java.math.*

  • java.text.*

  • java.util.*

In an XML configuration file the auto-import configuration may look as below. Note that all configuration options are available through the Configuration API as well.

<auto-import import-name="com.mycompany.mypackage.*"/>
  <auto-import import-name="com.mycompany.myapp.MyUtilityClass"/>

2.4.5. Relational Database Access

Esper has the capability to join event streams against historical data sources, such as a relational database. This section describes the configuration entries that the engine requires to access data stored in your database. Please see Section 6.13, “Joining Relational Data via SQL” for information on the use of EQL queries that include historical data sources.

EQL queries that poll data from a relational database specify the name of the database as part of the EQL statement. The engine uses the configuration information described here to resolve the database name in the statement to database settings. The required and optional database settings are summarized below.

  • Database connections can be obtained via JDBC javax.xml.DataSource or alternatively via java.sql.DriverManager. Either one of these methods to obtain new database connections is a required configuration.

  • Optionally, JDBC connection-level settings such as auto-commit, transaction isolation level, read-only and the catalog name can be defined.

  • Optionally, a connection lifecycle can be set to indicate to the engine whether the engine must retain connections or must obtain a new connection for each lookup.

  • Optionally, define a cache policy to allow the engine to retrieve data from a query cache, reducing the number of query executions.

Some of the settings can have important performance implications that need to be carefully considered in relationship to your database software, JDBC driver and runtime environment. This section attempts to outline such implications where appropriate.

The sample XML configuration file in the "etc" folder can be used as a template for configuring database settings. All settings are also available by means of the configuration API through the classes Configuration and ConfigurationDBRef.

2.4.5.1. Connections obtained via DataSource

The snippet of XML below configures a database named mydb1 to obtain connections via a javax.sql.DataSource. The datasource-connection element instructs the engine to obtain new connections to the database mydb1 by performing a lookup via javax.naming.InitialContext for the given object lookup name. Optional environment properties for the InitialContext are also shown in the example.

<database-reference name="mydb1">
  <datasource-connection context-lookup-name="java:comp/env/jdbc/mydb">
    <env-property name="java.naming.factory.initial" value ="com.myclass.CtxFactory"/>
    <env-property name="java.naming.provider.url" value ="iiop://localhost:1050"/>
  </datasource-connection>
</database-reference>

To help you better understand how the engine uses this information to obtain connections, we have included the logic below.

if (envProperties.size() > 0) {
  initialContext = new InitialContext(envProperties);
}
else {
  initialContext = new InitialContext();
}
DataSource dataSource = (DataSource) initialContext.lookup(lookupName);
Connection connection = dataSource.getConnection();

2.4.5.2. Connections obtained via DriverManager

The next snippet of XML configures a database named mydb2 to obtain connections via java.sql.DriverManager. The drivermanager-connection element instructs the engine to obtain new connections to the database mydb2 by means of Class.forName and DriverManager.getConnection using the class name, URL and optional username, password and connection arguments.

<database-reference name="mydb2">
  <drivermanager-connection class-name="my.sql.Driver" 
        url="jdbc:mysql://localhost/test?user=root&amp;password=mypassword" 
        user="myuser" password="mypassword">
    <connection-arg name="user" value ="myuser"/>
    <connection-arg name="password" value ="mypassword"/>
    <connection-arg name="somearg" value ="someargvalue"/>
  </drivermanager-connection>
</database-reference>

The username and password are shown in multiple places in the XML only as an example. Please check with your database software on the required information in URL and connection arguments.

2.4.5.3. Connections-level settings

Additional connection-level settings can optionally be provided to the engine which the engine will apply to new connections. When the engine obtains a new connection, it applies only those settings to the connection that are explicitly configured. The engine leaves all other connection settings at default values.

The below XML is a sample of all available configuration settings. Please refer to the Java API JavaDocs for java.sql.Connection for more information to each option or check the documentation of your JDBC driver and database software.

<database-reference name="mydb2">
... configure data source or driver manager settings...
  <connection-settings auto-commit="true" catalog="mycatalog" 
      read-only="true" transaction-isolation="1" />
</database-reference>

The read-only setting can be used to indicate to your database engine that SQL statements are read-only. The transaction-isolation and auto-commit help you database software perform the right level of locking and lock release. Consider setting these values to reduce transactional overhead in your database queries.

2.4.5.4. Connections lifecycle settings

By default the engine retains a separate database connection for each started EQL statement. However, it is possible to override this behavior and require the engine to obtain a new database connection for each lookup, and to close that database connection after the lookup is completed. This often makes sense when you have a large number of EQL statements and require pooling of connections via a connection pool. If your runtime environment includes an application server, the connection pool may be exposed as a DataSource.

The XML for this option is below. The connection lifecycle allows the following values: pooled and retain.

<database-reference name="mydb2">
... configure data source or driver manager settings...
    <connection-lifecycle value="pooled"/>
</database-reference>

2.4.5.5. Cache settings

Cache settings can dramatically reduce the number of database queries that the engine executes for EQL statements. If no cache setting is specified, the engine does not cache query results and executes a separate database query for every event.

Caches store the results of database queries and make these results available to subsequent queries using the exact same query parameters as the query for which the result was stored. If your query returns one or more rows, the cache keep the result rows of the query keyed to the parameters of the query. If your query returns no rows, the cache also keeps the empty result. Query results are held by a cache until the cache entry is evicted. The strategies available for evicting cached query results are listed next.

2.4.5.5.1. LRU Cache

The least-recently-used (LRU) cache is configured by a maximum size. The cache discards the least recently used query results first once the cache reaches the maximum size.

The XML configuration entry for a LRU cache is as below. This entry configures an LRU cache holding up to 1000 query results.

<database-reference name="mydb">
... configure data source or driver manager settings...
    <lru-cache size="1000"/>
</database-reference>
2.4.5.5.2. Expiry-time Cache

The expiry time cache is configured by a maximum age in seconds and a purge interval. The cache discards (on the get operation) any query results that are older then the maximum age so that stale data is not used. If the cache is not empty, then every purge interval number of seconds the engine purges any expired entries from the cache.

The XML configuration entry for an expiry-time cache is as follows. The example configures an expiry time cache in which prior query results are valid for 60 seconds and which the engine inspects every 2 minutes to remove query results older then 60 seconds.

<database-reference name="mydb">
... configure data source or driver manager settings...
    <expiry-time-cache max-age-seconds="60" purge-interval-seconds="120"/>
</database-reference>

2.4.6. Engine Settings related to Concurrency and Threading

2.4.6.1. Preserving the order of events delivered to listeners

In multithreaded environments, this setting controls whether dispatches of statement result events to listeners preserve the ordering in which a statement processes events. By default the engine guarantees that it delivers a statement's result events to statement listeners in the order in which the result is generated. This behavior can be turned off via configuration as below.

The next code snippet shows how to control this feature:

Configuration config = new Configuration();
config.getEngineDefaults().getThreading().setListenerDispatchPreserveOrder(false);
engine = EPServiceProviderManager.getDefaultProvider(config);

An the XML configuration file can also control this feature by adding the following elements:

<engine-settings>
  <defaults>
    <threading>
      <listener-dispatch preserve-order="false" timeout-msec="2000"/>
      <insert-into-dispatch preserve-order="false"/>
    </threading>
  </defaults>
</engine-settings>

As discussed, by default the engine can temporarily block a thread when delivering result events to listeners in order to preserve the order in which results are generated by a given statement. The maximum time the engine blocks a thread can also be configured, and by default is set to 1 second.

2.4.6.2. Preserving the order of events for insert-into streams

In multithreaded environments, this setting controls whether insert-into streams preserve the order of events inserted into them by one or more statements, allowing statements that consume other statement's events to behave deterministic.

By default, the engine acquires a lock per insert-into stream when a statement makes events available to further statements using the insert into clause. The lock allows generated events to be processed by further statements consuming the insert-into stream in the order the generating statement(s) produce events. This allows statements that require order (such as pattern detection, previous and prior functions) to behave deterministically.

The setting can be changed via the configuration API and XML as shown in the prior section.

2.4.6.3. Internal Timer Settings

This option can be used to disable the internal timer thread and such have the application supply external time events, as well as to set a timer resolution.

The next code snippet shows how to disable the internal timer thread via the configuration API:

Configuration config = new Configuration();
  config.getEngineDefaults().getThreading().setInternalTimerEnabled(false);

This snippet of XML configuration leaves the internal timer enabled (the default) and sets a resolution of 200 milliseconds (the default is 100 milliseconds):

<engine-settings>
  <defaults>
    <threading>
      <internal-timer enabled="true" msec-resolution="200"/>
    </threading>
  </defaults>
</engine-settings>

We recommend that when disabling the internal timer, applications send an external timer event setting the start time before creating statements, such that statement start time is well-defined.

2.4.7. Engine Settings related to Event Metadata

2.4.7.1. Java Class Property Names and Case Sensitivity

As discussed in Section 2.4.1.5, “Case Sensitivity and Property Names” this setting controls case sensitivity for Java event class properties of all Java classes as a default, rather then at a class level.

The next code snippet shows how to control this feature via the API:

Configuration config = new Configuration();
config.getEngineDefaults().getEventMeta().setClassPropertyResolutionStyle(
    Configuration.PropertyResolutionStyle.CASE_INSENSITIVE);

2.4.8. Engine Settings related to View Resources

2.4.8.1. Sharing View Resources between Statements

The engine by default attempts to optimize resource usage and thus re-uses or shares views between statements that declare same views. However, in multi-threaded environments, this can lead to reduced concurrency as locking for shared view resources must take place. Via this setting this behavior can be turned off for higher concurrency in multi-threaded processing.

The next code snippet outlines the API to turn off view resource sharing between statements:

Configuration config = new Configuration();
config.getEngineDefaults().getViewResources().setShareViews(false);

2.4.9. Engine Settings related to Logging

2.4.9.1. Execution Path Debug Logging

By default, the engine does not produce debug output for the event processing execution paths even when Log4j or Logger configurations have been set to output debug level logs. To enable debug level logging, set this option in the configuration as well as in your Log4j configuration file.

The API to use to enable debug logging is shown here:

Configuration config = new Configuration();
config.getEngineDefaults().getLogging().setEnableExecutionDebug(true);

Note: this is a configuration option that applies to all engine instances of a given Java module or VM.

Chapter 3. API Reference

3.1. API Overview

Esper has 2 primary interfaces that this section outlines: The administrative interface and the runtime interface.

Use Esper's administrative interface to create and manage EQL and pattern statements, and set runtime configurations, as discussed in Section 6.1, “EQL Introduction” and Section 7.1, “Event Pattern Overview”.

Use Esper's runtime interface to send events into the engine, emit events and get statistics for an engine instance.

The JavaDoc documentation is also a great source for API information.

3.2. Engine Instances

Each instance of an Esper engine is completely independent of other engine instances and has its own administrative and runtime interface.

An instance of the Esper engine is obtained via static methods on the EPServiceProviderManager class. The getDefaultProvider method and the getProvider(String URI) methods return an instance of the Esper engine. The latter can be used to obtain multiple instances of the engine for different URI values. The EPServiceProviderManager determines if the URI matches all prior URI values and returns the same engine instance for the same URI value. If the URI has not been seen before, it creates a new engine instance.

The code snipped below gets the default instance Esper engine. Subsequent calls to get the default engine instance return the same instance.

EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();

This code snippet gets an Esper engine for URI RFIDProcessor1. Subsequent calls to get an engine with the same URI return the same instance.

EPServiceProvider epService = EPServiceProviderManager.getProvider("RFIDProcessor1");

An existing Esper engine instance can be reset via the initialize method on the EPServiceProvider instance. This stops and removes all statements in the Engine.

3.3. The Administrative Interface

3.3.1. Creating Statements

Create event pattern expression and EQL statements via the administrative interface EPAdministrator.

This code snippet gets an Esper engine then creates an event pattern and an EQL statement.

EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPAdministrator admin = epService.getEPAdministrator();

EPStatement 10secRecurTrigger = admin.createPattern(
  "every timer:at(*, *, *, *, *, */10)");

EPStatement countStmt = admin.createEQL(
  "select count(*) from MarketDataBean.win:time(60 sec)");

Note that event pattern expressions can also occur within EQL statements. This is outlined in more detail in Section 6.4.2, “Pattern-based event streams”.

The create methods on EPAdministrator are overloaded and allow an optional statement name to be passed to the engine. A statement name can be useful for retrieving a statement by name from the engine at a later time. The engine assigns a statement name if no statement name is supplied on statement creation.

The createPattern and createEQL methods return EPStatement instances. Statements are automatically started and active when created. A statement can also be stopped and started again via the stop and start methods shown in the code snippet below.

countStmt.stop();
countStmt.start();

3.3.2. Adding Listeners

We can subscribe to updates posted by a statement via the addListener and removeListener methods on EPStatement . We need to provide an implementation of the UpdateListener or the StatementAwareUpdateListener interface to the statement:

UpdateListener myListener = new MyUpdateListener();
countStmt.addListener(myListener);

EQL statements and event patterns publish old data and new data to registered UpdateListener listeners. New data published by statements is the events representing the new values of derived data held by the statement. Old data published by statements constists of the events representing the prior values of derived data held by the statement.

A second listener interface is the StatementAwareUpdateListener interface. A StatementAwareUpdateListener is especially useful for registering the same listener object with multiple statements, as the listener receives the statement instance and engine instance in addition to new and old data when the engine indicates new results to a listener.

StatementAwareUpdateListener myListener = new MyStmtAwareUpdateListener();
statement.addListener(myListener);

To indicate results the engine invokes this method on StatementAwareUpdateListener listeners: update(EventBean[] newEvents, EventBean[] oldEvents, EPStatement statement, EPServiceProvider epServiceProvider)

3.3.3. Using Iterators

Subscribing to events posted by a statement is following a push model. The engine pushes data to listeners when events are received that cause data to change or patterns to match. Alternatively, statements can also serve up data in a pull model via the iterator method. This can come in handy if we are not interested in all new updates, but only want to perform a frequent poll for the latest data. For example, an event pattern that fires every 5 seconds could be used to pull data from an EQL statement. The code snippet below demonstrates some pull code.

Iterator<EventBean> eventIter = countStmt.iterator();
for (EventBean event : eventIter) {
   // .. do something ..
}

This is a second example:

double averagePrice = (Double) eqlStatement.iterator().next().get("average");

The iterator method can be used to pull results out of most statements, including statements that contain aggregation functions, pattern statements, and statements that contain a where clause, group by clause, having clause or order by clause.

For statements without an order by clause, the iterator method returns events in the order maintained by the data window. For statements that contain an order by clause, the iterator method returns events in the order indicated by the order by clause.

Esper places the following restrictions on the pull API and usage of the iterator method:

  1. EQL statements joining multiple event streams do not support the pull API.

  2. Since the iterator method returns events to the application immediately, the iterator does not honor an output rate limiting clause, if present.

  3. In multithreaded applications, the iterator method does not hold any locks and modifications to the underlying data window may throw runtime exceptions in the face of concurrent modifications.

3.3.4. Managing Statements

The EPAdministrator interface provides the facilities for managing statements:

  • Use the getStatement method to obtain an existing started or stopped statement by name

  • Use the getStatementNames methods to obtain a list of started and stopped statement names

  • Use the startAllStatements, stopAllStatements and destroyAllStatements methods to manage all statements in one operation

3.3.5. Runtime Engine Configuration

Certain configuration changes are available to perform on an engine instance while in operation. Such configuration operations are available via the getConfiguration method on EPAdministrator, which returns an ConfigurationOperations object.

The configuration operations available on a running engine instance are as follows. Please see Chapter 2, Configuration for more information.

  • Add an new event type for a JavaBean class, legacy Java class or custom Java class

  • Add an new DOM XML event type

  • Add an new Map-based event type

3.4. The Runtime Interface

The EPRuntime interface is used to send events for processing into an Esper engine, and to emit Events from an engine instance to the outside world.

The below code snippet shows how to send a Java object event to the engine. Note that the sendEvent method is overloaded. As events can take on different representation classes in Java, the sendEvent takes parameters to reflect the different types of events that can be send into the engine. The Chapter 5, Event Representations section explains the types of events accepted.

EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPRuntime runtime = epService.getEPRuntime();

// Send an example event containing stock market data
runtime.sendEvent(new MarketDataBean('IBM', 75.0));		

Events, in theoretical terms, are observations of a state change that occured in the past. Since one cannot change an event that happened in the past, events are best modelled as immutable objects.

Note that the Esper engine relies on events that are sent into an engine to not change their state. Typically, applications create a new event object for every new event, to represent that new event. Application should not modify an existing event that was sent into the engine.

Another important method in the runtime interface is the route method. This method is designed for use by UpdateListener implementations that need to send events into an engine instance.

The emit and addEmittedListener methods can be used to emit events from a runtime to a registered set of one or more emitted event listeners. This mechanism is available as a service to enable channel-based publish-subscribe of events emitted from an engine instance via the emit method. Emitting events is not integrated with EQL and is available only via the EPRuntime interface. Events are emitted on an event channel identified by a name. Listeners are implementations of the EmittedListener interface. Via the addEmittedListener method a listener can be added to the specified event channel. The lister receives only those events posted to that channel. The channel parameter to addEmittedListener also allows null values. If a null channel value is specified, the listeners receives emitted events posted on any channel.

3.5. Time-Keeping Events

Special events are provided that can be used to control the time-keeping of an engine instance. There are two models for an engine to keep track of time. Internal clocking is when the engine instance relies on the java.util.Timer class for time tick events. External clocking can be used to supply time ticks to the engine. The latter is useful for testing time-based event sequences or for synchronizing the engine with an external time source.

By default, the Esper engine uses internal time ticks. This behavior can be changed by sending a timer control event to the engine as shown below.

EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
EPRuntime runtime = epService.getEPRuntime();
// switch to external clocking
runtime.sendEvent(new TimerControlEvent(TimerControlEvent.ClockType.CLOCK_EXTERNAL));

// send a time tick
long timeInMillis = System.currentTimeMillis();	// Or get the time somewhere else
runtime.sendEvent(new CurrentTimeEvent(timeInMillis));

We recommend that when disabling the internal timer, applications send an external timer event setting the start time before creating statements, such that statement start time is well-defined.

3.6. Events Received from the Engine

The Esper engine posts events to registered UpdateListener instances ('push' method for receiving events). For many statements events can also be pulled from statements via the iterator method. Both pull and push supply EventBean instances representing the events generated by the engine or events supplied to the engine. Each EventBean instance represents an event, with each event being either an artificial event, composite event or an event supplied to the engine via its runtime interface.

The getEventType method supplies an event's event type information represented by an EventType instance. The EventType supplies event property names and types as well as information about the underlying object to the event.

The engine may generate artificial events that contain information derived from event streams. A typical example for artificial events is the events posted for a statement to calculate univariate statistics on an event property. The below example shows such a statement and queries the generated events for an average value.

// Derive univariate statistics on price for the last 100 market data events
String stmt = "select * from MarketDataBean(symbol='IBM').win:length(100).stat:uni('price')";
EPStatement priceStatsView = epService.getEPAdministrator().createEQL(stmt);
priceStatsView.addListener(testListener);
// Example listener code
public class MyUpdateListener implements UpdateListener
{
    public void update(EventBean[] newData, EventBean[] oldData)
    {
        // Interrogate events
        System.out.println("new average price=" + newData[0].get("average");
	}
}

Composite events are events that aggregate one or more other events. Composite events are typically created by the engine for statements that join two event streams, and for event patterns in which the causal events are retained and reported in a composite event. The example below shows such an event pattern.

// Look for a pattern where BEvent follows AEvent
String pattern = "a=AEvent -> b=BEvent";
EPStatement stmt = epService.getEPAdministrator().createPattern(pattern);
stmt.addListener(testListener);
// Example listener code
public class MyUpdateListener implements UpdateListener
{
    public void update(EventBean[] newData, EventBean[] oldData)
    {
        System.out.println("a event=" + newData[0].get("a").getUnderlying());
        System.out.println("b event=" + newData[0].get("b").getUnderlying());
	}
}

Note that the update method can receive multiple events at once as it accepts an array of EventBean instances. For example, a time batch window may post multiple events to listeners representing a batch of events received during a given time period.

Pattern statements can also produce multiple events delivered to update listeners in one invocation. The pattern statement below, for instance, delivers an event for each A event that was not followed by a B event with the same id property within 60 seconds of the A event. The engine may deliver all matching A events as an array of events in a single invocation of the update method of each listener to the statement:

every a=A -> (timer:interval(60 sec) and not B(id=a.id))

3.7. Engine Threading and Concurrency

Esper is designed from the ground up to operate as a component to multi-threaded, highly-concurrent applications that require efficient use of Java VM resources. In addition, multi-threaded execution requires guarantees in predictability of results and deterministic processing. This section discusses these concerns in detail.

In Esper, an engine instance is a unit of separation. Applications can obtain and discard (initialize) one or more engine instances within the same Java VM and can provide the same or different engine configurations to each instance. An engine instance efficiently shares resources between statements. For example, consider two statements that declare the same data window. The engine matches up view declarations provided by each statements and can thus provide a single data window representation shared between the two statements.

Applications can use Esper APIs to concurrently, by multiple threads of execution, perform such functions as creating and managing statements, or sending events into an engine instance for processing. Applications can use one or more thread pools or any set of same or different threads of execution with any of the public Esper APIs. There are no restrictions towards threading other then those noted in specific sections of this document.

Applications using Esper retain full control over threading, allowing an engine to be easily embedded and used as a component or library in your favorite Java container or process. It is up to the application code to use multiple threads for processing events by the engine, if so desired. All event processing takes places within your application thread call stack. The exception is timer-based processing if your engine instance relies on the internal timer (default).

The fact that event processing takes places within an application thread call stack makes developing applications with Esper easier: Any common Java integrated development environment (IDE) can host an Esper engine instance. This allows developers to easily set up test cases, debug through listener code and inspect input or output events, or trace their call stack.

To send events into an engine concurrently by multiple execution threads, typically applications use the Java java.lang.Thread or java.lang.Runnable classes or Java 5 concurrent utilities that include abstractions for thread pools and blocking in-memory queues.

Each engine instance maintains a single timer thread (internal timer) providing for time or schedule-based processing within the engine. The default resolution at which the timer operates is 100 milliseconds. The internal timer thread can be disabled and applications can instead send external time events to an engine instance to perform timer or scheduled processing at the resolution required by an application.

Each engine instance performs minimal locking to enable high levels of concurrency. An engine instance locks on a statement level to protect statement resources.

For an engine instance to produce predictable results from the viewpoint of listeners to statements, an engine instance by default ensures that it dispatches statement result events to listeners in the order in which a statement produced result events. Applications that require the highest possible concurrency and do not require predictable order of delivery of events to listeners, this feature can be turned off via configuration.

In multithreaded environments, when one or more statements make result events available via the insert into clause to further statements, the engine preserves the order of events inserted into the generated insert-into stream, allowing statements that consume other statement's events to behave deterministic. This feature can also be turned off via configuration.

We generally recommended that listener implementations do not block. By implementing listener code as non-blocking code execution threads can often achieve higher levels of concurrency.

Chapter 4. Understanding the Output Model

4.1. Introduction

The Esper output model is continuous: Update listeners to statements receive updated data as soon as the engine processes events for that statement, according to the statement's choice of event streams, views, filters and output rates.

As outlined in Chapter 3, API Reference the interface for listeners is net.esper.client.UpdateListener. Implementations must provide a single update method that the engine invokes when results become available:

The engine provides statement results to update listeners by placing results in net.esper.event.EventBean instances. A typical listener implementation queries the EventBean instances via getter methods to obtain the statement-generated results.

The get method on the EventBean interface can be used to retrieve result columns by name. The property name supplied to the get method can also be used to query nested, indexed or array properties of object graphs as discussed in more detail in Chapter 5, Event Representations.

The getUnderlying method on the EventBean interface allows update listeners to obtain the underlying event object. For wildcard selects, the underlying event is the event object that was sent into the engine via the sendEvent method. For joins and select clauses with expressions, the underlying object implements java.util.Map.

4.2. Insert Stream

In this section we look at the output of a very simple EQL statement. The statement selects an event stream without using a data window and without applying any filtering, as follows:

select * from Withdrawal

This statement selects all Withdrawal events. Every time the engine processes an event of type Withdrawal or any sub-type of Withdrawal, it invokes all update listeners, handing the new event to each of the statement's listeners.

The term insert stream denotes the new events arriving, and entering a data window or aggregation. The insert stream in this example is the stream of arriving Withdrawal events, and is posted to listeners as new events.

The diagram below shows a series of Withdrawal events 1 to 6 arriving over time. The number in parenthesis is the withdrawal amount, an event property that is used in the examples that discuss filtering.

Output example for a simple statement

Figure 4.1. Output example for a simple statement

The example statement above results in only new events and no old events posted by the engine to the statement's listeners.

4.3. Insert and Remove Stream

A length window instructs the engine to only keep the last N events for a stream. The next statement applies a length window onto the Withdrawal event stream. The statement serves to illustrate the concept of data window and events entering and leaving a data window:

select * from Withdrawal.win:length(5)

The size of this statement's length window is five events. The engine enters all arriving Withdrawal events into the length window. When the length window is full, the oldest Withdrawal event is pushed out the window. The engine indicates to listeners all events entering the window as new events, and all events leaving the window as old events.

While the term insert stream denotes new events arriving, the term remove stream denotes events leaving a data window, or changing aggregation values. In this example, the remove stream is the stream of Withdrawal events that leave the length window, and such events are posted to listeners as old events.

The next diagram illustrates how the length window contents change as events arrive and shows the events posted to an update listener.

Output example for a length window

Figure 4.2. Output example for a length window

As before, all arriving events are posted as new events to listeners. In addition, when event W1 leaves the length window on arrival of event W6, it is posted as an old event to listeners.

Similar to a length window, a time window also keeps the most recent events up to a given time period. A time window of 5 seconds, for example, keeps the last 5 seconds of events. As seconds pass, the time window actively pushes the oldest events out of the window resulting in one or more old events posted to update listeners.

Note EQL supports optional istream and rstream keywords on select-clauses and on insert-into clauses. These instruct the engine to only forward events that enter or leave data windows, or select only current or prior aggregation values, i.e. the insert stream or the remove stream.

4.4. Filters and Where-clauses

Filters to event streams allow filtering events out of a given stream before events enter a data window. The statement below shows a filter that selects Withdrawal events with an amount value of 200 or more.

select * from Withdrawal(amount>=200).win:length(5)

With the filter, any Withdrawal events that have an amount of less then 200 do not enter the length window and are therefore not passed to update listeners. Filters are discussed in more detail in Section 6.4.1, “Filter-based event streams” and Section 7.4, “Filter Expressions In Patterns”.

Output example for a statement with an event stream filter

Figure 4.3. Output example for a statement with an event stream filter

The where-clause and having-clause in statements eliminate potential result rows at a later stage in processing, after events have been processed into a statement's data window or other views.

The next statement applies a where-clause to Withdrawal events. Where-clauses are discussed in more detail in Section 6.5, “Specifying Search Conditions: the Where Clause”.

select * from Withdrawal.win:length(5) where amount >= 200

The where-clause applies to both new events and old events. As the diagram below shows, arriving events enter the window however only events that pass the where-clause are handed to update listeners. Also, as events leave the data window, only those events that pass the conditions in the where-clause are posted to listeners as old events.

Output example for a statement with where-clause

Figure 4.4. Output example for a statement with where-clause

The where-clause can contain complex conditions while event stream filters are more restrictive in the type of filters that can be specified. The next statement's where-clause applies the ceil function of the java.lang.Math Java library class in the where clause. The insert-into clause makes the results of the first statement available to the second statement:

insert into WithdrawalFiltered select * from Withdrawal where Math.ceil(amount) >= 200

select * from WithdrawalFiltered

4.5. Time Windows

In this section we explain the output model of statements employing a time window view and a time batch view.

4.5.1. Time Window

A time window is a moving window extending to the specified time interval into the past based on the system time. Time windows enable us to limit the number of events considered by a query, as do length windows.

As a practical example, consider the need to determine all accounts where the average withdrawal amount per account for the last 4 seconds of withdrawals is greater then 1000. The statement to solve this problem is shown below.

select account, avg(amount) 
from Withdrawal.win:time(4 sec) 
group by account
having amount > 1000

The next diagram serves to illustrate the functioning of a time window. For the diagram, we assume a query that simply selects the event itself and does not group or filter events.

select * from Withdrawal.win:time(4 sec)

The diagram starts at a given time t and displays the contents of the time window at t + 4 and t + 5 seconds and so on.

Output example for a statement with a time window

Figure 4.5. Output example for a statement with a time window

The activity as illustrated by the diagram:

  1. At time t + 4 seconds an event W1 arrives and enters the time window. The engine reports the new event to update listeners.

  2. At time t + 5 seconds an event W2 arrives and enters the time window. The engine reports the new event to update listeners.

  3. At time t + 6.5 seconds an event W3 arrives and enters the time window. The engine reports the new event to update listeners.

  4. At time t + 8 seconds event W1 leaves the time window. The engine reports the event as an old event to update listeners.

4.5.2. Time Batch

The time batch view buffers events and releases them every specified time interval in one update. Time windows control the evaluation of events, as does the length batch window.

The next diagram serves to illustrate the functioning of a time batch view. For the diagram, we assume a simple query as below:

select * from Withdrawal.win:time_batch(4 sec)

The diagram starts at a given time t and displays the contents of the time window at t + 4 and t + 5 seconds and so on.

Output example for a statement with a time batch view

Figure 4.6. Output example for a statement with a time batch view

The activity as illustrated by the diagram:

  1. At time t + 1 seconds an event W1 arrives and enters the batch. No call to inform update listeners occurs.

  2. At time t + 3 seconds an event W2 arrives and enters the batch. No call to inform update listeners occurs.

  3. At time t + 4 seconds the engine processes the batched events and a starts a new batch. The engine reports events W1 and W2 to update listeners.

  4. At time t + 6.5 seconds an event W3 arrives and enters the batch. No call to inform update listeners occurs.

  5. At time t + 8 seconds the engine processes the batched events and a starts a new batch. The engine reports the event W3 as new data to update listeners. The engine reports the events W1 and W2 as old data (prior batch) to update listeners.

4.6. Aggregation and Grouping

4.6.1. Insert and Remove Stream

Statements that aggregate events via aggregations functions also post remove stream events as aggregated values change.

Consider the following statement that alerts when 2 Withdrawal events have been received:

select count(*) as mycount from Withdrawal having count(*) = 2

When the engine encounters the second withdrawal event, the engine posts a new event to update listeners. The value of the "mycount" property on that new event is 2. Additionally, when the engine encounters the third Withdrawal event, it posts an old event to update listeners containing the prior value of the count. The value of the "mycount" property on that old event is also 2.

The istream or rstream keyword can be used to eliminate either new events or old events posted to listeners. The next statement uses the istream keyword causing the engine to call the listener only once when the second Withdrawal event is received:

select istream count(*) as mycount from Withdrawal having count(*) = 2

4.6.2. Output for Event Batches

The built-in data windows that act on batches of events are the win:time_batch and the win:length_batch views. The win:time_batch data window collects events arriving during a given time interval and posts collected events as a batch to listeners at the end of the time interval. The win:length_batch data window collects a given number of events and posts collected events as a batch to listeners when the given number of events has collected.

Let's look at how a time batch window may be used:

select account, amount from Withdrawal.win:time_batch(1 sec)

The above statement collects events arriving during a one-second interval, at the end of which the engine posts the collected events as new events (insert stream) to each listener. The engine posts the events collected during the prior batch as old events (remove stream). The engine starts posting events to listeners one second after it receives the first event and thereon.

For statements containing aggregation functions and/or a group by clause, the engine posts consolidated aggregation results for an event batch. For example, consider the following statement:

select sum(amount) as mysum from Withdrawal.win:time_batch(1 sec)

Following SQL (Standard Query Language) standards for queries against relational databases, the presence or absence of aggregation functions and the presence or absence of the group by clause dictates the number of rows posted by the engine to listeners at the end of a batch. The next sections outline the output model for batched events under aggregation and grouping.

Note that output rate limiting also generates batches of events following the output model as discussed here.

4.6.2.1. Un-aggregated and Un-grouped

An example statement for the un-aggregated and un-grouped case is as follows:

select * from Withdrawal.win:time_batch(1 sec)

At the end of a time interval, the engine posts to listeners one row for each event arriving during the time interval.

4.6.2.2. Fully Aggregated and Un-grouped

If your statement only selects aggregation values and does not group, your statement may look as the example below:

select sum(amount) 
from Withdrawal.win:time_batch(1 sec)

At the end of a time interval, the engine posts to listeners a single row indicating the aggregation result. The aggregation result aggregates all events collected during the time interval.

4.6.2.3. Aggregated and Un-Grouped

If your statement selects non-aggregated properties and aggregation values, and does not group, your statement may be similar to this statement:

select account, sum(amount) 
from Withdrawal.win:time_batch(1 sec)

At the end of a time interval, the engine posts to listeners one row per event. The aggregation result aggregates all events collected during the time interval.

4.6.2.4. Fully Aggregated and Grouped

If your statement selects aggregation values and all non-aggregated properties in the select clause are listed in the group by clause, then your statement may look similar to this example:

select account, sum(amount) 
from Withdrawal.win:time_batch(1 sec) 
group by account

At the end of a time interval, the engine posts to listeners one row per unique account number. The aggregation result aggregates per unique account.

4.6.2.5. Aggregated and Grouped

If your statement selects non-aggregated properties and aggregation values, and groups only some properties using the group by clause, your statement may look as below:

select account, accountName, sum(amount) 
from Withdrawal.win:time_batch(1 sec) 
group by account

At the end of a time interval, the engine posts to listeners one row per event. The aggregation result aggregates per unique account.

4.7. EventBean Query Results

The engine posts events to UpdateListener implementations as net.esper.event.EventBean instances. The EventBean represents a row (event) in your continuous query's result set.

Use the iterator method on EPStatement statements to poll or read data out of statements, if you require read-based access to statement result sets. Statement iterators also return EventBean instances.

The EventBean interface offers property type metadata via the getEventType method returning an EventType. The EventType provides property name, property type and underlying type information. This information can be useful to dynamically interrogate query results. The underlying event that an EventBean represents can be obtained via the getUnderlying method. Please see Chapter 5, Event Representations