Home Contact

Event Stream Intelligence: Esper & NEsper

Solution Patterns

Key Concepts

  1. What is CEP?
  2. What is an event?
  3. What is an event stream or simply "stream"?
  4. What is an event cloud compared to a stream?

Introductory

  1. How do I look for specific events on a stream, dropping the unwanted events?
  2. How do I aggregate several (simple) events from a stream into a single new (complex) event summarizing event properties of the simple events?
  3. How do I limit the unbounded events of a stream to a defined timespan or number of events?
  4. How do I correlate events from one or several streams on a common set of values?

General

  1. How do I measure the rate of arrival of events in a given time period?
  2. How do I measure the rate of arrival of events in a given time period per another category?
  3. How do I correlate events arriving in 2 or more streams?
  4. How do I correlate events arriving out-of-order?
  5. How do I use patterns to correlate events arriving in-order or out-of-order?
  6. How to implement an On-Off window? How to detect events between other events?
  7. How do I correlate 3 events in a time window in which events have similar properties?
  8. How do I remove all events from a window and start over?
  9. How do I combine data windows and their expiry polices? Or define custom logic for removing events?
  10. How do I seed an empty data window from a filled data window?
  11. How do I keep a separate window of events per category and compute aggregates for each category's window?
  12. How do I use results of one statement in another statement?
  13. How do I reduce the rate of event output by my statement? How do I get frequent but not continuous results?
  14. How do I delay data? How do I compare against previous events?
  15. How do I detect the absence of an event?
  16. How do I detect the absence of an event and the presence of an event arriving too late?
  17. How do I report at a regular interval without any incoming events?
  18. How do I find missing events arriving in 2 or more streams that are correlated?
  19. How do I look into the past and consider missing events? How do I do a snapshot, fire-and-forget or on-demand query?
  20. How do I detect the absence of multiple unrelated events that are known in advance? Is there a way to simplify hundreds or thousands of similar statements into one global statement? How do I "prime" or initialize a large number of similar patterns without creating a pattern statement for each pattern instance?
  21. How do I detect something really complex, like a triple-bottom pattern?
  22. Can I use an UpdateListener to listen to events and the iterator-based pull-API together?
  23. How do I stop an insert after a period of time?
  24. Can I use a regular expression (regexp) within a filter?
  25. How can I remove duplicates? What if I want to form pairs of events where each pair is a unique combination of the latest event of two streams?
  26. How do I remove or drop events from a stream? I have a dynamic set of negative filters?
  27. How do I detect a specific sequence of events and get all events that take part in this sequence?
  28. How to implement a sell trailing stop order?
  29. I have one listener for multiple statements, how do I track which statement generated a result?
  30. Is there a way to receive the list of all events that participated in a window? I'm looking for a way to show the user the cause of the alert.
  31. I want to know if an event hits at least one registered query?
  32. I want to know what streams an EPL statement references but don't want to parse the EPL string? I want to programmatically inspect and change the select clause columns when a user enters an EPL query, how to do this?
  33. How do I store statements in a file? Is there a standard file storage?
  34. When to use a plug-in aggregation function and when to use a plug-in custom view?
  35. When to use on-demand fire-and-forget queries versus on-select predefined queries?
  36. How do I integrate with the Spring framework? How to use Spring support for Groovy or other scripting languages with EPL?
  37. We have our own math library, what are the options of utilizing it? How can I make calls out of the EPL into our own existing code?
  38. Can I use SOAP, WS-*, RESTful, JMS, RPC or other remote calls?
  39. Can Esper support a distributed cache or data grid?
  40. What to do if my events are not JavaBeans, not well-defined, their properties are not known in advance and may differ wildly, and are nested?
  41. How do I query nested indexed events? Or more generally, event objects contained in event objects?

Key Concepts

What is CEP?

Complex Event Processing, or CEP, is primarily an event processing concept that deals with the task of processing multiple events with the goal of identifying the meaningful events within the event cloud.

CEP employs techniques such as detection of complex patterns of many events, event correlation and abstraction, event hierarchies, and relationships between events such as causality, membership, and timing, and event-driven processes.

(source: wikipedia.org).

[top]

What is an event?

An event is an immutable record of a past occurrence of an action or state change. Event properties capture the useful information for an event.

Or.... "Something that happens" (source: webster.com).

Typically, the following is true for an event:

  • It's anchored in time.
  • It's not under your control.

An event can itself contain further events. Event properties contain may contain rich and nested domain-specific information.

[top]

What is an event stream or simply "stream"?

A time ordered sequence of events in time.

A stream is append-only, one cannot remove events (conceptually), one can just add them to the sequence.

A stream is unbounded, i.e. there is no end to the sequence {event1, event2, event3, event4, ..., eventN}.

[top]

What is an event cloud compared to a stream?

A stream is a time ordered sequence of events in time, while a cloud is unordered.

For example, as valid stream is {{1s, event1}, {2s, event2}, {4s, event3}}.

A cloud is unordered e.g. {{1s, event1}, {4s, event2}, {2s, event3}}.

[top]

Introductory

How do I look for specific events on a stream, dropping the unwanted events?

Consider a stream of temperature sensor events that provide a temperature value. This query looks for all sensor events (named SensorEvent) where the temperature is greater then 90:

select * from SensorEvent where temperature > 90
[top]

How do I aggregate several (simple) events from a stream into a single new (complex) event summarizing event properties of the simple events?

This sample outputs the average temperature of all sensor events received from the start of the query:

select avg(temperature) from SensorEvent
[top]

How do I limit the unbounded events of a stream to a defined timespan or number of events?

There is many different flavors of data windows, which serve to limit the view on the unbound stream of data.

The next sample outputs the average temperature of all sensor events received within the last 1 minute:

select avg(temperature) from SensorEvent.win:time(1 min)
[top]

How do I correlate events from one or several streams on a common set of values?

Consider the stream of sensor events and a stream of sensor location events, assuming our sensors can move around geographically.

This sample query joins the last sensor event with the last sensor location (SensorLocationEvent) for each sensor identified by a sensor id (sensorId):

select temperature, coordinates 
from SensorEvent.std:lastevent() as sensor,
     SensorLocationEvent.std:lastevent() as loc
where sensor.sensorId = loc.sensorId

You may ask why the "std:lastevent()" is needed here. When joining two streams, in this example and as is often the case we are simply looking to join the last event of each stream.

[top]

General

How do I measure the rate of arrival of events in a given time period?

The time batch window of 1 second as shown below produces an event each second with the count of the number of events in the batch.

select count(*) as cnt from MarketDataEvent.win:time_batch(1 second)
[top]

How do I measure the rate of arrival of events in a given time period per another category?

We can solve this problem by grouping the events in the window per the category. The below example uses 'feed' as the category.

select feed, count(*) as cnt from MarketDataEvent.win:time_batch(1 second) 
group by feed
[top]

How do I correlate events arriving in 2 or more streams?

The join of event streams looks very similar to joins in SQL. To bind data in the streams together, across streams, we identify keys to join on.

The below example specifies the 'accountNumber' field as the only join key. In this example we hold the last 30 seconds of events for each stream.

select fraud.accountNumber as accntNum, withdraw.amount as amount         
from FraudWarningEvent.win:time(30 sec) as fraud,
     WithdrawalEvent.win:time(30 sec) as withdraw
where fraud.accountNumber = withdraw.accountNumber
[top]

How do I correlate events arriving out-of-order?

Let's assume we have three different types of events, all having a common attribute 'exchangeId'. Let's call the events start, finished and aborted.

Let's expect exactly one start event and multiple finished or aborted events for every exchange_id. The start event may happen after the finished or aborted events, but they all happen within say 30 seconds per exchangeId.

There are multiple possible answers to this problem. One solution can be an outer join using time windows, and looking at the remove stream since we care about the composite events when a start event leaves the time window after 30 sec, when the other events for the same exchange id have accumulated.

select rstream * from
  StartEvent.win:time(30 sec) start
    left outer join
  AbortedEvent.win:time(30 sec) abort
    on about.exchangeId = start.exchangeId
    left outer join
  FinishedEvent.win:time(30 sec) finished
    on finished.exchangeId = start.exchangeId

In the example above, every time a StartEvent leaves the time window it takes with it all aborted and finished events. The abort property will be null if no abort occurred.

Another solution is shown next using patterns to detect out-of-order events.

[top]

How do I use patterns to correlate events arriving in-order or out-of-order?

The prior discussion focused on 3 kinds of events: start, finished and aborted.

A second possible solution can be found in using patterns. If one doesn't really care about processing the multiple aborted events and simply wants to get a failed or finished indication when the first aborted event or finished event is encountered, then a simpler approach can be specifying a pattern for each interesting combinations, some of which are shown below.

select * from pattern [every s=StartEvent -> 
  a=AbortedEvent(exchangeId = s.echangeId) where timer:within(30 sec)]

The above pattern simply looks for aborted transactions and reacts to the first aborted event coming in after a start event. The pattern to detect finished transactions, i.e. where no abort occurred, should look about like this:

select * from pattern [every s=StartEvent -> 
  (f=FinishedEvent(exchangeId = s.echangeId) where timer:within(30 sec)
    and not AbortedEvent(exchangeId = s.echangeId) where timer:within(30 sec)]

To detect out-of-order events, the pattern can certainly be reversed:

select * from pattern [every a=AbortedEvent -> 
  s=StartEvent(exchangeId = s.echangeId) where timer:within(30 sec)]
[top]

How to implement an On-Off window? How to detect events between other events?

A use case wants to select all tuples that are between two tuples. For example, assume that I want all tuples between the first tuple with PARAM1=2 and the first tuple after this one with PARAM2=0. This would select all tuples with time between 3 and 8 in the example below.

TIME | PARAM1 | PARAM2
1 1 0
2 1 1
3 2 2 <== ON
4 2 3
5 3 5
6 3 4
7 2 3
8 2 0 <== OFF
9 3 1

This seems best solved with a pattern with followed-by and unbound repeat, such as:

select * from pattern [
  beginevent=Event(param1 = 2, param2 = 2) 
    -> middleevent=Event(param1 != beginevent.param1, param2 != 0) 
         until endevent=Event(param1 = beginevent.param1, param2 = 0)
  ]
[top]

How do I correlate 3 events in a time window in which events have similar properties?

My application needs to match 3 events which occur within a time window where 3 different users submit trades with similar properties. The properties that must have the same value for each of the 3 events matched is currency and direction. The pattern is to match only if all 3 events have a different user. The time window should be 10 minutes long.

The pattern that solves this problem is shown below. It uses the timer:within pattern guard to limit the lifetime of each active sub-expression to 10 minutes.

every trade1=Trade(userId in ('U1000','U1001','U1002') ) ->
  (trade2=Trade(userId in ('U1000','U1001','U1002') and
     userId != trade1.userId and ccypair = trade1.ccypair 
     and direction = trade1.direction) ->
   trade3=Trade(userId in ('U1000','U1001','U1002') and 
     userId not in (trade1.userId, trade2.userId) and
     ccypair = trade1.ccypair and direction = trade1.direction))
  ) where timer:within(10 min)
[top]

How do I remove all events from a window and start over?

You have a need for a data window that can detect a certain situation, and if that situation occurs you want to start fresh and remove all events?

Named windows and the on-delete clause address this need. This sample declares a named window to hold MyEvent events:

create window MyWindow.win:keepall() as select * from MyEvent

Populate the named window from all arriving MyEvent events:

insert into MyWindow select * from MyEvent

Detect the situation, in the below example the query looks at the average wait time per train station:

insert into WarningStream 
select trainStation, avg(waitTime) as avgWait
from MyWindow 
group by trainStation 
having avg(waitTime) > 60

Use the WarningStream events to remove from the named window:

on WarningStream delete from MyWindow
[top]

How do I combine data windows and their expiry polices? Or define custom logic for removing events?

The documentation outlines the built-in views, some of which combine length and time based expiry.

Another good place to look at is a named window. Named windows provide an on-delete clause that helps to build or combine a custom strategy for when to remove events.

In addition, multiple data windows can also be combined via the retain-union and retain-intersection keywords.

Next, we'll show the named window and on-delete option. Let's start with a named window that keeps the last 1 minute of events:

create window MyWindow.win:time(1 min) select * from MyEvent

This example EPL removes from the named window those that have the same id:

on MyDeleteEvent as d delete from MyWindow as w where w.id = d.id

This example EPL removes non-unique rows by category, so that only the last event for each category stays in the named window. It therefore selects the remove stream (rstream) of the unique window:

insert rstream into MyNonUnique select rstream id from MyEvent.std:unique(category)
		
on MyNonUnique as d delete from MyWindow as w where w.id = d.id

Variables can also be a useful way to parameterize an expiry policy. The next sample EPL assumes that a variable by name CURRENT_THRESHOLD was declared and employs a pattern to execute every 20 seconds:

on pattern[every timer:interval(20 sec)] 
delete from MyWindow where threshold > CURRENT_THRESHOLD

Last, a plug-in view implementation may be the right way to go if you want to parameterize it special ways or need integration into the EPL language or want to use the Esper scheduling APIs.

[top]

How do I seed an empty data window from a filled data window?

This is a feature of named windows. When a named window is filled already, and a new statement gets created on a filled named window, that statement's aggregation does not start empty.

Also, named window may be initialized from other named windows. Look up the "insert" keyword in the create window clause.

[top]

How do I keep a separate window of events per category and compute aggregates for each category's window?

I have one or more categories and for each of these categories I need to keep a separate window of events.

In the statement below we have stock tick events for which we want to compute the average price of the last 10 stock tick events per symbol. Notice we are not using the last 10 events overall, we are looking at the last 10 events per symbol.

select symbol, avg(price) as avgPrice 
from StockTick.std:groupby(symbol).win:length(10) 
group by symbol

We can also specify multiple categories:

select symbol, location, avg(price) as avgPrice 
from StockTick.std:groupby(symbol,location).win:length(10) 
group by symbol, location

Let's consider another possible way of using a separate window of events per category. In some use cases we may need to compute not an average per group, but an average over all groups that considers only the last N events per group. This can be accomplished by leaving the group-by clause off. Now the engine computes the average price over all symbols, considering only the last 10 events per symbol:

select symbol, location, avg(price) as avgPrice 
from StockTick.std:groupby(symbol).win:length(10)
[top]

How do I use results of one statement in another statement?

Use the insert into syntax to use the events generated by one statement as input to another statement.

We can first compute the number of events arriving within 1 second, then use that number to perform additional aggregation. Here we compute for the last 30 seconds the maximum and minimum rate per feed.

insert into TicksPerSecond select feed, count(*) as cnt 
from MarketDataEvent.win:time_batch(1 second) 
group by feed
	
select feed, max(cnt) as maxCount, min(cnt) as minCount 
from TicksPerSecond.win:time(30 sec) 
group by feed
[top]

How do I reduce the rate of event output by my statement? How do I get frequent but not continuous results?

Use output rate limiting to stabilize or reduce the rate at which rows are output from a query, by outputting rows at a specified time or row-based interval.

The example below limits the otherwise continuous output to an output row every 5 seconds. The output contains the feed and average volume per feed of the last 60 seconds of market data events.

select feed, avg(volume) as cnt from MarketDataEvent.win:time(60 sec) 
group by feed 
output every 5 seconds
[top]

How do I delay data? How do I compare against previous events?

There are a few different approaches that this section outlines.

Your application may need to delay events for a certain time. A simple way to delay data is to enter the data into a time window and select the remove stream which is the data leaving the window:

insert rstream into DelayedStream select rstream task, measurement, rate from CurrentStream.win:time(10 min)

In order to compare current data with delayed data, one possible way is to join delayed data and current data. For example:

select d.task, d.measurement, d.rate - c.rate as delta
from CurrentStream as c unidirectional, DelayedStream.std:lastevent() as d
where d.task = c.task and d.measurement = c.measurement

This example uses the "unidirectional" keyword. The keyword is useful to indicate that results are only output when events of one stream arrive, and not the other. In this example, when events of the DelayedStream there is no output.

Here is an alternative way using the "output snapshot" keywords instead. This example executes a join and post results only every 1 minute:

select d.task, d.measurement, d.rate - c.rate as delta
from CurrentStream.std:lastevent() as c, DelayedStream.std:lastevent() as d
where d.task = c.task and d.measurement = c.measurement
output snapshot every 1 minute

Instead of the join, the "prev" previous-event function could be used to fetch and compare data from previous rows. This is useful if the arrival intervals of the stream are known:

select task, measurement, rate - prev(4, rate) as delta
from CurrentStream.win:time(5 min)

The "prev" previous-event function also works well with the "std:groupby" view in that is operates per-group when used with this view, for example:

select rate, prev(1, rate) from MyStream.std:groupby(task).win:length(2)

A pattern statement can also be a great approach to form pairs or triplets (or any other combination of old and current events) and insert the pattern-matched events into a new stream for further processing, or use the select-clause to determine deltas as this sample statement shows:

select a.qty - b.qty, a.acct, a.instr from pattern  [
  every a=OrderStatus -> b=OrderStatus(acct=a.acct, instr=a.instr)
]

...or...

insert into MyStream select a, b from pattern [every a=OrderStatus -> b=OrderStatus(acct=a.acct, instr=a.instr)]
select a.qty - b.qty from MyStream
[top]

How do I detect the absence of an event?

Use a pattern to detect the absence of an event. The below pattern fires if an event A is not followed by an event B within 10 seconds.

select * from pattern [every EventA -> (timer:interval(10 sec) and not EventB)]

Outer joins are also a good way to detect missing events. A solution with an outer join was discussed above.

[top]

How do I detect the absence of an event and the presence of an event arriving too late?

Let's say we want to detect 2 situations: a) A Down event is not followed by an Up event, i.e. the Up event for the same equipment id is not coming in within 1 minute b) A Down event is followed by an Up event 30 seconds or more after the Down event, for the same equipment id as the Up event

select * from pattern [
  every down=MyEvent(text='down') ->
  (
    (timer:interval(1 min) and not up=MyEvent(text='Up', equipmentId=a.equipmentId))
      or
    ( (timer:interval(30 sec) and not MyEvent(text='Up', equipmentId=a.equipmentId))
        -> 
      up=MyEvent(text='Up', equipmentId=a.equipmentId) where timer:within(30 seconds)
    )]
[top]

How do I report at a regular interval without any incoming events?

Let's say we want to have our listener get invoked every 5 seconds, and select the last value, if any, from a stream.

select (select price from MarketData.std:lastevent()) as price 
from pattern [every timer:interval(5 sec)]

The pattern fires every 5 seconds causing the sub-select to take place, returning null if no MarketData events have come in, or returning the price column of the last MarketData event.

[top]

How do I find missing events arriving in 2 or more streams that are correlated?

As in SQL we can use outer joins to generate a result even if one or more of the correlated events are not found in a stream. Usually we want to generate the result after a certain time or after a certain number of events have been received, indicating that a correlated event is truly missing.

In this example we are looking for a withdrawal event without a login event for the same account number after 60 seconds.

We join withdrawal events with login events looking for login events that do not exist (account number is null). We want to get notified as these events leave the 60-second time window.

select withdraw.accountNumber as accntNum, withdraw.amount as amount         
from WithdrawalEvent.win:time(60 sec) as withdraw
     left outer join
     LoginEvent.win:time(60 sec) as login
on fraud.accountNumber = withdraw.accountNumber
where login.accountNumber = null
[top]

How do I look into the past and consider missing events? How do I do a snapshot, fire-and-forget or on-demand query?

I have the good old StockTrades stream with two fields: symbol and price. I'm trying to answer the following question "Were Company X stock actions priced above $70 during any moment of the last 5 minutes?".

Unfortunately, using a simple time-based sliding window won't work. To see why, imagine there were only two price updates: the first, at 10:54 am stated that stocks were at $71. The second, 2 minutes later, notified that the stocks went down to $69. Now imagine that the above question was posed at 11:00 am (of the same day). We, humans, know that the answer is "yes" because the price was $71 between 10:54 and 10:56. But the first event is outside the 5 minutes window and will thus be ignored by the system.

Since the question is posed at 11:00am and the question result is expected to be aware of the events that arrived before 11:00am, the solution seems to require that some events or perhaps only some aggregated information about events must be retained from before 11:00am.

Also, the "were" in the question "Were Company X stock actions priced above $70 during any moment of the last 5 minutes?" indicates that this is a snapshot on-demand query that should return results just once, i.e. the result in not expected to be continuous but a simple tuple as a result of fire-and-forget.

In Esper the solution could be a named window that is created in the morning, say 9am. Esper also supports snapshot on-demand (fire-and-forget) queries against named windows through API and JDBC inward-facing driver for reporting. The named window would need to hold the price and also the prior price to ensure that its not missing the the drop from $71 to $69. The Esper EPL queries would be something like below:

This set of statements would be created at 9am:

create window TickWindow.win:time(5 min) (price double, priorprice double)
insert into TickWindow select price, prior(1, price) as priorprice from StockTickEvent
	

This question posed at 11:00am via snapshot EPL query against the named window:

select * from TickWindow where price > 70 or priorprice > 70

Alternative solutions are as follows. One could use the on-select instead of a fire-and-forget query, as on-select is able to compile and maintain the proper index to speed up repeated execution. A second solution may change the queries to keep only the maximum, instead of each datapoint, however then the on-demand queries must be limited to questions towards the max value.

Here is the syntax to use a predefine query via on-select instead of a fire-and-forget query:

on MyMaxQueryEvent as limit select * from TickWindow where price > limit.pricelimit or priorprice > limit.pricelimit

When the question is posed at 11am one can send in a MyMaxQueryEvent with the pricelimit property set to 70 and the listener to the on-select statement gets the result.

[top]

How do I detect the absence of multiple unrelated events that are known in advance? Is there a way to simplify hundreds or thousands of similar statements into one global statement? How do I "prime" or initialize a large number of similar patterns without creating a pattern statement for each pattern instance?

We would like to detect the absence of certain events in the event stream. All possible tuples are known beforehand. Lets say a tuple simple consists of the IP-Address. The possible tuples might look like the following:

(ip=192.168.1.1), (ip=192.168.1.2), ... many more..., (ip=192.168.1.3)

Every one hour, we would like to know when one of those known tuples is not present in the event stream. The pattern EPL might look like:

select * from pattern [every (timer:interval(1 hours) and not IPEvent(ip=’192.168.1.1’))]

We would like to simplify this so hundreds or thousands of similar statements are reduced to one global statement, and send in a primer event to initialize each IP address looked for.

The single pattern that reacts to primer events:

select * from pattern [every p=PrimerEvent -> (every (timer:interval(1 hours) and not IPEvent(ip=p.ip)))]

Starting from this simple pattern, one could also add additional control events, such as to indicate when to end a looking for an IP.

Another possible solution may utilize a named window to hold the last event per client id and source id, and a second named window to hold only those IP to report on. Then, in intervals, one could select those events where the timestamp is older then two hours and that exist in the second named window, via on-select for example.

[top]

How do I detect something really complex, like a triple-bottom pattern?

The triple-bottom pattern is out of the world of stock trading and is described in Triple-Bottom Pattern in detail.

The problem can be broken down: First, how does one identify bottom price points among a stream of market data events? Second, once the individual bottom price points are identified, how does one detect an occurrence of 3 bottom price points, whose value is within approximation of each other, and that are spaced out over time in a pattern?

The first problem is an event streaming processing problem, I believe. The stream of events is market data that contains price points for the NIFTY index over time. I'll attempt to define a bottom price point as follows: If the average price for the last 5 days is 15% lower then the average price over a period of say 60 days, then the minimum price during that 5 days is a single bottom price point. Of course the number of days and percentages are parameters to figure out and get right.

-- The query to determine the average price for the last 60 days:
insert into AvgPriceLast60Days
select avg(price) as avgPrice
from MarketData.win:time(60 days)
output every 10 minutes
-- The query to determine the average price for the last 5 days:
insert into AvgPriceLast5Days
select avg(price) as avgPrice, min(price) as minPrice
from MarketData.win:time(5 days)
output every 10 minutes
-- Compare the last average prices for each:
insert into BottomPriceEvent
select minPrice as bottomPrice 
from AvgPriceLast60Days.std:last() as LastAvg60Days,
     AvgPriceLast5Days.std:last() as LastAvg5Days
where LastAvg60Days.avgPrice * 0.85 > LastAvg5Days.avgPrice
output first every 1 day

The last statement populates the "BottomPriceEvent" event stream as a stream of higher-level events in which each event represents a bottom price point.

The second part of the problem requires detecting 3 bottom price points whose values are within a given range of each other, and that have a certain temporal relationship with each other. Let's assume that the bottom price points should be within 5% each other. Let's also assume we are looking for bottom price points spaced at least 10 days apart from each other, but within 30 days of the prior bottom price point.

-- The pattern to detect the triple-bottom:
insert into TripeBottomPattern
select * from pattern [every a=ButtomPriceEvent 
  -> timer:interval(10 days) 
  -> ButtomPriceEvent(minPrice between 0.95*a.minPrice and 1.05*a.minPrice) where timer:within(30 days)
  -> timer:interval(10 days) 
  -> ButtomPriceEvent(minPrice between 0.95*a.minPrice and 1.05*a.minPrice) where timer:within(30 days)]

Finally, the resulting TripeBottomPattern event stream created by the last statement is the higher-level complex events representing that a triple-bottom pattern has been detected.

An additional interesting problem is that the stream and pattern queries are rather long-running continuous queries, since they need to run over days and month. That may requires persisting events, and/or using simulated time by playing back past events into the engine.

[top]

Can I use an UpdateListener to listen to events and the iterator-based pull-API together?

UpdateListener and iterator can be used together. An update listener implementation can also itself query the same statement or another statement's iterator, as the engine guarantees the iterators are up-to-date before calling update listeners even across statements.

The iterator can also be used to query for no matches. You should find the iterator to have minimal overhead depending on the type of statement iterated on, the overhead for patterns statements specifically is negligible.

[top]

How do I stop an insert after a period of time?

Assume we only want to receive the first 10 minutes of an incoming event stream and then stop receiving data from that stream.

The timer:within pattern guard function can serve here to stop the stream after a given amount of time, as the next statement shows:

insert into PackageNotifyEvent
select myevent.uid as uid, 'HOME' as loc, 'ARRIVED' as status 
from pattern[every myevent=TrackingEvent where timer:within(10 min)]
[top]

Can I use a regular expression (regexp) within a filter?

Yes a regular expression can be used as part of a filter expression. Pretty much any expression is allowed within event filter expressions other then aggregation functions and the previous or prior function.

select * from pattern[every myevent=TrackingEvent(event.uid regexp '^a-b-.*' 
  and event.lat in [40.1:40.2] and event.lon in [-74.1:-74.0])]
[top]

How can I remove duplicates? What if I want to form pairs of events where each pair is a unique combination of the latest event of two streams?

I'm trying to detect pairs of events correlated by a "type" value and a unique "device" value. Then based on this pair of events, I'd like to find the maximum "measurement" value and the corresponding "confidence" for the one with the max "measurement" value. Here's my event object:

class Sensor {
	long id;
	String type;
	String device;
	Double measurement;
	Double confidence;
}

I'll know in advance the set of possible device values, but the Sensor events can happen in any order, and two or more Sensor event for the same device might occur before a Sensor event for the other device occurs. Thus if a Sensor event for the same device occurs before a Sensor event for the other device, then the second Sensor event would replace the first Sensor event for that device of the same type. In other words, the last event for a particular device of a given type is the one that should be used in the calculation of the maximum.

The computation would be the maximum value of the 'measurement' property between A and B. Also, the 'confidence' value would correspond to the value from the event with the maximum 'measurement' property.

A sample input and output:

Sensor[id=1,type='Temperature',device='A',measurement=51,confidence=94.5]
Sensor[id=2,type='Temperature',device='A',measurement=57,confidence=95.5]
Sensor[id=3,type='Humidity',device='B',measurement=29,confidence=67.5]
Sensor[id=4,type='Temperature',device='B',measurement=55,confidence=88.0]
Sensor[id=5,type='Temperature',device='B',measurement=65,confidence=85.0]
Sensor[id=6,type='Temperature',device='B',measurement=49,confidence=87.0]
Sensor[id=7,type='Temperature',device='A',measurement=51,confidence=99.5]

For output, one would expect the following:

// First output event pairs events with id=2 and id=4 and chooses the values from id=2
MaxReading[type='Temperature',device='A',measurement=57,confidence=95.5]
// Second output event pairs events with id=6 and id=7, since the event with id=6
// replaces the one with id=5, the event with id=5 is never compared against the
// event with id=7
MaxReading[type='Temperature',device='A',measurement=51,confidence=99.5]

One possible solution builds pairs of events using a join:

// Create pairs of device A and B events
insert into Pair
select * from Sensor(device='A').std:lastevent() as a, Sensor(device='B').std:lastevent() as b
where a.type = b.type

From the resulting stream we remove those pairs in which either event was seen before, leaving unique pairs:

// Fast way of declaring the stream type, could also be done via config API or XML
insert into PairDuplicatesRemoved select * from Pair(1=2)
       
// Remove duplicate pairs where either sensor event is from the prior pair
insert into PairDuplicatesRemoved
select * from Pair
where a.id != (select a.id from PairDuplicatesRemoved.std:lastevent())
	and b.id != (select b.id from PairDuplicatesRemoved.std:lastevent())

Last, select the maximum measurement value between the pair of sensor events and the corresponding confidence and device:

select a.type,
       max(a.measurement, b.measurement) as measurement,
       case when a.measurement > b.measurement then a.confidence else b.confidence end as confidence,
       case when a.measurement > b.measurement then a.device else b.device end as device
       from PairDuplicatesRemoved
[top]

How do I remove or drop events from a stream? I have a dynamic set of negative filters?

Let's assume you have a stream of events and you want to remove events from the stream before further processing of the remaining events by other statements.

The @Drop annotation marks statements that preempt further processing of an event, if an event matches multiple statements. The @Priority annotation is also useful to specify, when an event matches multiple statements, which statements to process first. Note that @Drop and @Priority require an engine-level configuration setting that is off by default, please see the documentation for further details.

Other ways of solving this use case could be to use the UnmatchedListener to catch unmatched events or to use the EPL split-stream syntax.

[top]

How do I detect a specific sequence of events and get all events that take part in this sequence?

I have events coming from different sources. They have 2 states: success and failure, and they have a source. I would like to create a query to know when there are, for example, for a specific source 5 failure events followed by a success one. Of course as soon as there's a success event, the previous failure events shouldn't count anymore.

There needs to be a maximum time to wait until a success event to arrive, since we don't want to keep looking without end for a matching success event. We'll put a maximum time to wait for a success event to arrive, let's say 5 minutes. So we'll just drop failure events after 5 minutes.

Let's look at some samples: F5 means 5th failure event, S3 means 3rd success event. Also let's say we only need 5 failure events before a success one to have an alert.

Case1 - If within 5 minutes I have (from the same source)

F1 F2 F3 F4 F5 S1
then I want to throw an alert. The alert must know about those events meaning I would like the listener to get those events.

Case2 - If within 5 minutes I have (from the same source)

F1 F2 F3 F4 F5 F6 F7 F8 S1
then I would have an alert knowing about F4 to F8 and S1.

Case3 - If within 5 minutes I have (from the same source)

F1 F2 F3 F4 S1 F5 F6 S2
then no alert would be emitted since once S1 arrives there were only 4 failure events.

Case4 - still from the same source

F1 F2 F3 F4 (then 10 minutes later) F5 S1
then of course no alert as all the events aren't within the 5 minutes window.

Case5 - If within 5 minutes (this time we have the sources a and b)

F1a F1b F2a F3a F2b F4a S1b F5a F3b S1a
No alert will be create when S1b arrive because there's only 2 failures for b. When S1a arrives an alert is created because we have F1a to F5a before, with no success concerning a in between.

Solution: Since we are looking for a very specific sequence of events, a pattern is the best way to go. We want to make sure the pattern subexpressions end when a success event arrives, and this can be accomplished via the not-operator. We also want to limit the expression to live 5 minutes from the first failure event:

every a=F -> (
        (b=F(src=a.src) and not S(src=a.src)) ->
        (c=F(src=a.src) and not S(src=a.src)) ->
        (d=F(src=a.src) and not S(src=a.src)) ->
        (d=F(src=a.src) and not S(src=a.src)) ->
        (e=S(src=a.src) and not F(src=a.src))
     )
) where timer:within(5 min)

This solution works for all cases, including case 2. Even though the pattern looks for only 5 events in a row, it looks at any 5 subsequent events for the same source, matching case 2 for events F4 to F8 and S1 (the active expressions that include F1, F2 and F3 end when F6, F7 and F8 arrive).

[top]

How to implement a sell trailing stop order?

This is an example out of the stock trading domain, in which incoming events are market prices. A sell trailing stop order is a technique that is designed to allow an investor to specify a limit on the maximum possible loss, without setting a limit on the maximum possible gain.

A sell trailing stop order sets the lower boundary (stop) price at a fixed amount below the current market price with an attached "trailing" amount. As the market price rises, the stop price rises by the trail amount, but if the stock price falls, the stop price doesn't change, and a market order is submitted when the stop price (lower boundary) is hit.

Assume that the market price is 700 at the time of placing the trailing stop order. Assume that the stop price is 600. If the price goes to 703, the stop price must be updated to 603. If the price drops to 682, the trigger is still 603.

The solution considers the maximum market price since statement start time, compared against the current market price:

// since release 2.0
select * from Quote(symbol=GOOGL)
where price <= max(select max(lastPx) as lastPx from Quote(symbol='GOOG')) - 100, 600)
// or since for release 1.x		
insert into MaxPrice(lastPx) select max(lastPx) as lastPx from Quote(symbol='GOOG')
select * from Quote(symbol=GOOGL)
where price <= max(select lastPx from MaxPrice.std:lastevent()) - 100, 600)
		
[top]

I have one listener for multiple statements, how do I track which statement generated a result?

Your listener can implement the StatementAwareUpdateListener interface and get passed the statement and engine instance for each result.

For some use cases it can also come in handy to simply add a constant to each statement to identify the statement producing a result, for example:

select 120 as strategyId, * from Tick
[top]

Is there a way to receive the list of all events that participated in a window? I'm looking for a way to show the user the cause of the alert.

The pull API is a convenient way to retrieve data from a data window. The safeIterator method on EPStatement provides the events in a data window.

If the alert is based on a filter, then one may need to create a second statement that doesn't have the filter, such that iteration returns all current rows. A second statement with a same data window however would be inexpensive since the engine shares data windows. There is no possibility of a race condition: The engine guarantees that before a result is delivered to statements, all statements have up-to-date data.

[top]

I want to know if an event hits at least one registered query?

Esper has an UnmatchedListener interface that one can register with the engine via epRuntime.setUnmatchedListener(UnmatchedListener). The UnmatchedListener receives any event that has not been processed by any statement, i.e. events where no statement’s stream filter matches (where-clause however counts as a match since the event enters a data window with a where clause, i.e. only stream filters count).

[top]

I want to know what streams an EPL statement references but don't want to parse the EPL string? I want to programmatically inspect and change the select clause columns when a user enters an EPL query, how to do this?

The statement object model is designed to handle this case. Your application could compile (epAdministrator.compile) each statement and get an EPStatementObjectModel object representation of the statement and then interrogate the from-clause or select-clause.

The statement object model also allows EPL query to be composed from scratch via normal Java POJO objects. A statement object model can be rendered back into an EPL string, and it is also possible to create a statement directly from a statement object model.

[top]

How do I store statements in a file? Is there a standard file storage?

Esper does not prescribe any particular way of storing EPL queries. Some applications prefer XML files and some prefer properties files. Your application may use a table in a relational database to store queries if so desired.

We present below an example of storing statements in a property file. This is not a recommendation and is merely one possible way among many of storing EPL queries in a file.

Here is the example:

#Alert12
esper.statement.alert12.0=create window Alert12Window.win:time(1 hour) as select ticker as alertId, this from LastTickEvent
esper.statement.alert12.0.ha.prefix=resilient
esper.statement.alert12.1=insert into MaxPrice:alertId:(lastPx) select max(lastPx) as lastPx from LastTickEvent(ticker=':ticker:')
esper.statement.alert12.1.ha.prefix=resilient
esper.statement.alert12.2=insert into Alert:alertId:Event select * from LastTickEvent(ticker=':ticker:') where lastPx <= (1-(:value:/100.0))*(select lastPx from MaxPrice:alertId:.std:lastevent())
esper.statement.alert12.2.ha.prefix=durable
esper.statement.alert12.3=insert into Alert12Window select ':alertId:' as alertId, quote.this as this from pattern [quote=Alert:alertId:Event]
esper.statement.alert12.3.ha.prefix=durable

This way of storing EPL assignes an alert name and a suffix that represents the statement number for all EPL statements for the alert. It uses the

:replaceme:
format as a parameter placeholder that the application itself replaces with a value before creating a statement. This example does not use prepared statements.

[top]

When to use a plug-in aggregation function and when to use a plug-in custom view?

If you are not sure how to choose between custom plug-in aggregation functions and custom plug-in views, this entry explains the differences or advantages of one over the other in more detail.

A plug-in custom aggregation function works like other aggregation functions such as count, sum, average or standard deviation and may appear in the select-clause and in the having-clause in EPL.

A plug-in custom view can be a data window providing an expiry policy like a time window or length window, for example. Or instead a custom view can derive new information from a stream of events such as the results of a linear regression function (aka. derived-value view).

A plug-in view is always attached to a certain type of event that is provided by a filtered event stream or a pattern or by another view. Plug-in views can receive only one type of input event (input stream). If the view is a data window view, the output event type is always the same event type as the input event type. For derived-value views the output event type can be an entirely different type of events with new and often computed property values and types, including events that are from a different event representation such as for example XML-DOM.

If your application wants to provide a data window then use a plug-in view. If it needs to provide multiple computed values for each row of output, such as the slope-value and y-intercept value for a linear regression function for example, use a plug-in view.

The input values to a plug-in aggregation function are the result of one or more expressions (as compared to views which have events as input), and the output value of a plug-in aggregation function is always a single value, usually a primitive value such as a double-typed value but can be any object. If your application only needs to return a single value, more likely an aggregation function is appropriate.

The group-by clause acts only on aggregation functions (and not view output), providing an aggregation value per-group. Also output-rate-limiting can provide the last value per such group when the output condition occurs.

Views can also compute one or more output values per group by means of the "std:groupby()" view. These view output events are not grouped by the group-by or output-rate clauses, if present.

A view's output event properties become available in the select-clause, where-clause, group-by-clause, having-clause and order-by clause, while aggregation functions output values are only available in the select-clause and having-clause.

[top]

When to use on-demand fire-and-forget queries versus on-select predefined queries?

Sometimes user requirements are such that a query against data maintained by the engine must be fired. Sometimes such intra-day queries are well-defined and known upfront, sometimes not.

Via named windows Esper allows predefined queries based on the on-select clause.

Via named windows Esper also allows fire-and-forget queries that leave no trace. Fire-and-forget queries can also be compiled for repeated execution.

Here is a sample code snippet to prepare and call a fire-and-forget query:

String stmtText = "select * from SensorWindow where temperature = 80";
EPOnDemandPreparedQuery onDemandQuery = epService.getEPRuntime().prepareQuery(stmtText);
EPOnDemandQueryResult result = onDemandQuery.execute();
System.out.println(result.getArray()[0].get("sensor"));

A on-demand fire-and-forget query has the penalty of compiling the query and executing the query against an un-indexed data set, making the query slower to execute compared to pre-defined queries. The advantage is that it allows any type of query.

Compare this to a predefined query based on the on-select clause. The next code snippet creates and executes a pre-defined query:

String stmtText = "on SensorQueryEvent select sensor from SensorWindow where temperature = querytemp";
EPStatement onSelectStmt = epService.getEPAdministrator().createEPL(stmtText);
onSelectStmt.setSubscriber(this);	// make sure you have an update(String sensor) method for the class

// Execute query, results are delivered via call to the update method.
// The SensorQueryEvent is expected to have a "querytemp" property as used in the on-select.
epService.getEPRuntime().sendEvent(new SensorQueryEvent(80));

A predefined query allows the Esper engine to inspect the query conditions and thus maintain a proper index on named window contents to evaluate each query in a very efficient fashion. Thereby a predefined query can exhibt much better performance then a fire-and-forget query. See also the named window query benchmark for performance tests of both approaches.

[top]

How do I integrate with the Spring framework? How to use Spring support for Groovy or other scripting languages with EPL?

The Spring Framework (or Spring for short) is an open source application framework. This FAQ entry describes how a Spring XML file can hold EPL statements and inject listeners. It also shows how the Groovy dynamic scripting language can provide inlined scripts that acts as listeners to EPL continuous-query statements.

This solution requires Spring and Java 6.

A sample XML file for use with Spring follows. The XML relies on two classes in your classpath: EsperBean and StatementBean. These classes are NOT part of the Esper distribution. They are instead listed below as examples.

<beans>
    <bean id="esperBean" class="EsperBean">
	<property name="statements">
	    <bean class="StatementBean">
		<constructor-arg value="select * from java.lang.String"/>
		<property name="listeners">
		    <list>
			<bean class="MyUpdateListener"/>
			<ref bean="groovyListener"/>
		    </list>
		</property>
	    </bean>
	</property>
    </bean>

    <!--sample groovy listener-->
    <lang:groovy id="groovyListener">
	<lang:inline-script>
	    package org.springframework.scripting.groovy;
	    import com.espertech.esper.client.UpdateListener
	    import com.espertech.esper.client.EventBean;

	    class GroovyMessenger implements UpdateListener {
		public void update(EventBean[] eventBeans, EventBean[] eventBeans1) {
		    System.out.println(Arrays.toString(eventBeans) + "from groovy");
		}
	    }
	</lang:inline-script>
    </lang:groovy>

</beans>

The EsperBean class below represents a thin wrapper for an EPServiceProvider:

public class EsperBean implements BeanNameAware, InitializingBean, DisposableBean {
    private EPServiceProvider epServiceProvider;
    private EPRuntime epRuntime;
    private String name;
    private Set<StatementBean> statementBeans = new LinkedHashSet<StatementBean>();

    public void setStatements(StatementBean... statementBeans) {
	for (StatementBean statementBean : statementBeans) {
	    addStatement(statementBean);
	}
    }

    public void addStatement(StatementBean statementBean) {
	statementBeans.add(statementBean);
    }

    public void sendEvent(Object event) {
	epRuntime.sendEvent(event);
    }

    public void setBeanName(String name) {
	this.name = name;
    }

    public void afterPropertiesSet() throws Exception {
	epServiceProvider = EPServiceProviderManager.getProvider(name);
	epRuntime = epServiceProvider.getEPRuntime();
	for (StatementBean statementBean : statementBeans) {
	    EPStatement epStatement = epServiceProvider.getEPAdministrator().createEPL(statementBean.getEPL());
	    statementBean.setEPStatement(epStatement);
	}
    }

    public void destroy() throws Exception {
	epServiceProvider.destroy();
    }
}

The StatementBean class is a thin wrapper for an EPStatement, and is also required for the example:

public class StatementBean {
    private String epl;
    private EPStatement epStatement;
    private Set<UpdateListener> listeners = new LinkedHashSet<UpdateListener>();

    public StatementBean(String epl) {
        this.epl = epl;
    }

    public String getEPL(){
        return epl;
    }

    public void setListeners(UpdateListener... listeners) {
        for (UpdateListener listener : listeners) {
            addListener(listener);
        }
    }
    public void addListener(UpdateListener listener) {
        listeners.add(listener);
        if (epStatement != null) {
            epStatement.addListener(listener);
        }
    }

    void setEPStatement(EPStatement epStatement) {
        this.epStatement = epStatement;
        for (UpdateListener listener : listeners) {
            epStatement.addListener(listener);
        }
    }
}

Finally, next is a sample code snippet for loading the XML file in Spring, which will automatically hook up the statements and listeners as defined in the XML:

ClassPathXmlApplicationContext appContext = new ClassPathXmlApplicationContext(new String[]{"esperspring.xml"});
EsperBean esperBean = (EsperBean) appContext.getBean("esperBean", EsperBean.class);
esperBean.sendEvent("Test Event");
// ...when done, destroy the context...
appContext.destroy();
[top]

We have our own math library, what are the options of utilizing it? How can I make calls out of the EPL into our own existing code?

There are several options. The best choice among the options depends on what you want to accomplish, and how the existing library, function or other system exposes its functionality (static methods, service or POJO etc.).

The first option is the user-defined method. You can invoke a user-defined method in any expression directly without any configuration. You can import a class via configuration to avoid package names in EPL. For example, assuming that the "com.mycompany.MyLibrary" class provides a static method by name "computeDistance":

select com.mycompany.MyLibrary.computeDistance(x1, y1, x2, y2) from MyCoordinateEvent
// ... or after MyLibrary is imported via configuration
select MyLibrary.computeDistance(x1, y1, x2, y2) from MyCoordinateEvent

The second option is to invoke a method on your event object itself. This works only if your event representation is a Java object. An example, assuming that the "MyCoordinateEvent" event underlying class provides a method by name "computeDistance":

select myevent.computeDistance(x1, y1, x2, y2) from MyCoordinateEvent as myevent

The third option is to provide a custom aggregation function via the extension API. A custom aggregation function can take many parameters and returns only one value, i.e. cannot return multiple values, however the value returned can be any object. Please consult the documentation for examples. A sample EPL statement is as follows, assuming that the "myTrendFunction" custom aggregation function has been created and configured:

select myTrendFunction(price) from OrderEvent group by productId

The forth option is to provide a custom view via the extension API. A custom view takes parameters as well as an input stream of events and generates a result stream of events. Please consult the documentation for examples. A sample EPL statement is as follows, assuming that the "mathlib:volatility" custom view has been created and configured:

select * from OrderEvent(symbol='IBM').mathlib:volatility()

The fifth option is to use a method invocation. A method invocation is a function that acts alone or in a join and returns rows. Please consult the documentation for examples. Here is a sample EPL that utilizes a join, assuming that the "cacheLookup" function is provided by class "com.mycompany.MyLibrary":

select * from RFIDEvent, method:com.mycompany.MyLibrary.cacheLookup(assetId)

The last option is to have your listener or subscriber code invoke the library. Results can be send back into the engine by you listener via a further event, if needed.

[top]

Can I use SOAP, WS-*, RESTful, JMS, RPC or other remote calls?

The previous FAQ had outlined how to invoke an external function. Your external function may invoke internal or external resource as part of its evaluation using any of the standards.

Also, Esper provides certain input and output adapters as described in the EsperIO documentation.

You may also want to consider creating your own event representation via the extension API if your transport or event repository already has event metadata available that you want to reuse.

In the design you should keep in mind that blocking calls may reduce throughput.

[top]

Can Esper support a distributed cache or data grid?

We see it as rather trivial to consume from grid and publish to grid already in Esper, using a grid map listener implementation and other APIs. You can also do stream-to-grid joins using the EPL method invocation joins capabilities.

High availability and failover is implemented in EsperHA out of the box (http://www.espertech.com). EsperHA also provides pluggable event store so that events retained in windows can be stored (and possibly shared and computed against) in a grid.

[top]

What to do if my events are not JavaBeans, not well-defined, their properties are not known in advance and may differ wildly, and are nested?

Here is an actual user question:

Each data item implements an interface, but the properties available on the concrete objects differ wildly. Also, each data item can be considered to be composed of multiple levels of these data items. One of the fields on the interface is a getParent field, which returns the data item one level up. For example: If X is the object which we have a direct handle to, X.parent = Y, and Y.parent = Z, we often want to look at Z.field as part of our filter. Also, my events do not describe the property set of the event (not a JavaBean). Thus, I don't directly know that X's parent is Y, whose parent is of type Z, which then has a property named 'foo'. (Beans here are essentially user-defined structures for contributed code, and thus we have no real way of knowing what properties are on a bean until we receive the bean. When the users use the beans, they obviously know which bean they're working with. For us, we're getting it downstream, and that bean could be any of a dynamically growing variety.

Dynamic properties are weakly-typed dynamically-resolved properties that use a '?' syntax to denote the dynamic part of a property expression, for example:

select parent?.parent.foo from XEvent

The above query returns the value of "foo" property of the object provided by the "parent" property (if present) and its parent property (if present) of a XEvent. The value returned is of type Object and probably requires the use of the EPL "cast" or "instanceof" functions depending on what to do with the value.

By moving the '?' operator into a different position the query can indicate which properties in the nesting level must exist. The next query checks that the "parent" propery exists upon compilation:

select parent.parent.foo? from XEvent

Dynamic properties can be used in combination with indexed and mapped properties as well.

Another approach is to map such Java objects to Map event types: these also allow inheritance, nesting and dynamic properties and are easy to generate programmatically and its easier to change the event type at runtime based on the available metadata or the actually arriving events, through the runtime configuration API.

Another possible approach is to create a custom event representation plug-in.

The best approach generally is to use event inheritance (Java object and Map event representations) when possible, nested properties (all event representations) when possible, and strongly-typed properties to keep statements simple and easy to read.

There a number of options available in the configuration to handle Java classes that may not adhere to JavaBean conventions.

[top]

How do I query nested indexed events? Or more generally, event objects contained in event objects?

Your application may have a parent event that contains multiple subevents. You want to perform aggregation or pattern matching on the subevents plus information on the parent event.

Under the term contained-event selection the Esper engine can handle events that contain properties that are themselves events. For example when application events are coarse-grained structures and you need to perform bulk operations on the rows of the property graph in an event, with any number of nesting level.

In this example that a user has provided, a parent ResponseTime event contains multiple subevents that each measure individual operations (a database or JMS operation, for example) that are part of a larger operation, represented by a ResponseTime event. Each ResponseTime event has a one or more SubEvent object that provide a subevent type and number of milliseconds for the operation.

The example here uses Java objects. The example works the same for XML or Map-based events, we are picking a Java event object here for demonstration. See the docs for further examples.

The sample ResponseEvent event and SubEvent definitions are:

public class ResponseEvent {
  private String category;
  private SubEvent[] subEvents;

  public ResponseEvent(String category, SubEvent[] subEvents) {
    this.category = category;
    this.subEvents = subEvents;
  }

  public String getCategory() {
    return category;
  }

  public SubEvent[] getSubEvents() {
    return subEvents;
  }
}

public class SubEvent {
  private long responseTimeMillis;
  private String subEventType;

  public SubEvent(long responseTimeMillis, String subEventType) {
    this.responseTimeMillis = responseTimeMillis;
    this.subEventType = subEventType;
  }

  public long getResponseTimeMillis() {
    return responseTimeMillis;
  }

  public String getSubEventType() {
    return subEventType;
  }
}

The next sample code snip adds the parent event type to the known types, here via configuration API but configuration XML would work just as well:

epService.getEPAdministrator().getConfiguration().addEventType("ResponseEvent", ResponseEvent.class);

This is a sample query to continuously output the average response time per category and subevent-type:

select category, subEventType, avg(responseTimeMillis) as avgTime 
from ResponseEvent[select category, * from subEvents].win:time(1 min) 
group by category, subEventType
[top]