|
Event Stream Intelligence: Esper & NEsper |
Solution Patterns
General
- How do I measure the rate of arrival of events in a given time period?
- How do I measure the rate of arrival of events in a given time period per another category?
- How do I correlate events arriving in 2 or more streams?
- How do I find missing events arriving in 2 or more streams that are correlated?
- How do I correlate events arriving out-of-order?
- How do I use patterns to correlate events arriving in-order or out-of-order?
- How do I correlate 3 events in a time window in which events have similar properties?
- How do I keep a separate window of events per category and compute aggregates for each category's window?
- How do I use results of one statement in another statement?
- How do I reduce the rate of event output by my statement? How do I get frequent but not continuous results?
- How do I detect the absence of an event?
- How do I detect the absence of an event and the presence of an event arriving too late?
- How do I report at a regular interval without any incoming events?
- How do I detect something really complex, like a triple-bottom pattern?
- Can I use an UpdateListener to listen to events and the iterator-based pull-API together?
- How do I stop an insert after a period of time?
- Can I use a regular expression (regexp) within a filter?
- How can I remove duplicates? What if I want to form pairs of events where each pair is a unique combination of the latest event of two streams?
- How do I detect a specific sequence of events and get all events that take part in this sequence?
- How to implement a sell trailing stop order?
General
- How do I measure the rate of arrival of events in a given time period?
-
The time batch window of 1 second as shown below produces an event each second with the count of the number of events in the batch.
select count(*) as cnt from MarketDataEvent.win:time_batch(1 second)
[top] - How do I measure the rate of arrival of events in a given time period per another category?
-
We can solve this problem by grouping the events in the window per the category. The below example uses 'feed' as the category.
select feed, count(*) as cnt from MarketDataEvent.win:time_batch(1 second) group by feed
[top] - How do I correlate events arriving in 2 or more streams?
-
The join of event streams looks very similar to joins in SQL. To bind data in the streams together, across streams, we identify keys to join on.
The below example specifies the 'accountNumber' field as the only join key. In this example we hold the last 30 seconds of events for each stream.
select fraud.accountNumber as accntNum, withdraw.amount as amount from FraudWarningEvent.win:time(30 sec) as fraud, WithdrawalEvent.win:time(30 sec) as withdraw where fraud.accountNumber = withdraw.accountNumber[top] - How do I find missing events arriving in 2 or more streams that are correlated?
-
As in SQL we can use outer joins to generate a result even if one or more of the correlated events are not found in a stream. Usually we want to generate the result after a certain time or after a certain number of events have been received, indicating that a correlated event is truly missing.
In this example we are looking for a withdrawal event without a login event for the same account number after 60 seconds.
We join withdrawal events with login events looking for login events that do not exist (account number is null). We want to get notified as these events leave the 60-second time window.
select withdraw.accountNumber as accntNum, withdraw.amount as amount from WithdrawalEvent.win:time(60 sec) as withdraw left outer join LoginEvent.win:time(60 sec) as login on fraud.accountNumber = withdraw.accountNumber where login.accountNumber = null[top] - How do I correlate events arriving out-of-order?
-
Let's assume we have three different types of events, all having a common attribute 'exchangeId'. Let's call the events start, finished and aborted.
Let's expect exactly one start event and multiple finished or aborted events for every exchange_id. The start event may happen after the finished or aborted events, but they all happen within say 30 seconds per exchangeId.
There are multiple possible answers to this problem. One solution can be an outer join using time windows, and looking at the remove stream since we care about the composite events when a start event leaves the time window after 30 sec, when the other events for the same exchange id have accumulated.
select rstream * from StartEvent.win:time(30 sec) start left outer join AbortedEvent.win:time(30 sec) abort on about.exchangeId = start.exchangeId left outer join FinishedEvent.win:time(30 sec) finished on finished.exchangeId = start.exchangeIdIn the example above, every time a StartEvent leaves the time window it takes with it all aborted and finished events. The abort property will be null if no abort occurred.
Another solution is shown next using patterns to detect out-of-order events.
[top] - How do I use patterns to correlate events arriving in-order or out-of-order?
-
The prior discussion focused on 3 kinds of events: start, finished and aborted.
A second possible solution can be found in using patterns. If one doesn't really care about processing the multiple aborted events and simply wants to get a failed or finished indication when the first aborted event or finished event is encountered, then a simpler approach can be specifying a pattern for each interesting combinations, some of which are shown below.
select * from pattern [every s=StartEvent -> a=AbortedEvent(exchangeId = s.echangeId) where timer:within(30 sec)]
The above pattern simply looks for aborted transactions and reacts to the first aborted event coming in after a start event. The pattern to detect finished transactions, i.e. where no abort occurred, should look about like this:
select * from pattern [every s=StartEvent -> (f=FinishedEvent(exchangeId = s.echangeId) where timer:within(30 sec) and not AbortedEvent(exchangeId = s.echangeId) where timer:within(30 sec)]To detect out-of-order events, the pattern can certainly be reversed:
select * from pattern [every a=AbortedEvent -> s=StartEvent(exchangeId = s.echangeId) where timer:within(30 sec)]
[top] - How do I correlate 3 events in a time window in which events have similar properties?
-
My application needs to match 3 events which occur within a time window where 3 different users submit trades with similar properties. The properties that must have the same value for each of the 3 events matched is currency and direction. The pattern is to match only if all 3 events have a different user. The time window should be 10 minutes long.
The pattern that solves this problem is shown below. It uses the timer:within pattern guard to limit the lifetime of each active sub-expression to 10 minutes.
every trade1=Trade(userId in ('U1000','U1001','U1002') ) -> (trade2=Trade(userId in ('U1000','U1001','U1002') and userId != trade1.userId and ccypair = trade1.ccypair and direction = trade1.direction) -> trade3=Trade(userId in ('U1000','U1001','U1002') and userId not in (trade1.userId, trade2.userId) and ccypair = trade1.ccypair and direction = trade1.direction)) ) where timer:within(10 min)[top] - How do I keep a separate window of events per category and compute aggregates for each category's window?
-
I have one or more categories and for each of these categories I need to keep a separate window of events.
In the statement below we have stock tick events for which we want to compute the average price of the last 10 stock tick events per symbol. Notice we are not using the last 10 events overall, we are looking at the last 10 events per symbol.
select symbol, avg(price) as avgPrice from StockTick.std:groupby(symbol).win:length(10) group by symbol
We can also specify multiple categories:
select symbol, location, avg(price) as avgPrice from StockTick.std:groupby(symbol,location).win:length(10) group by symbol, location
Let's consider another possible way of using a separate window of events per category. In some use cases we may need to compute not an average per group, but an average over all groups that considers only the last N events per group. This can be accpmlished by leaving the group-by clause off. Now the engine computes the average price over all symbols, considering only the last 10 events per symbol:
select symbol, location, avg(price) as avgPrice from StockTick.std:groupby(symbol).win:length(10)
[top] - How do I use results of one statement in another statement?
-
Use the insert into syntax to use the events generated by one statement as input to another statement.
We can first compute the number of events arriving within 1 second, then use that number to perform additional aggregation. Here we compute for the last 30 seconds the maximum and minimum rate per feed.
insert into TicksPerSecond select feed, count(*) as cnt from MarketDataEvent.win:time_batch(1 second) group by feed select feed, max(cnt) as maxCount, min(cnt) as minCount from TicksPerSecond.win:time(30 sec) group by feed
[top] - How do I reduce the rate of event output by my statement? How do I get frequent but not continuous results?
-
Use output rate limiting to stabilize or reduce the rate at which rows are output from a query, by outputting rows at a specified time or row-based interval.
The example below limits the otherwise continuous output to an output row every 5 seconds. The output contains the feed and average volume per feed of the last 60 seconds of market data events.
select feed, avg(volume) as cnt from MarketDataEvent.win:time(60 sec) group by feed output every 5 seconds
[top] - How do I detect the absence of an event?
-
Use a pattern to detect the absence of an event. The below pattern fires if an event A is not followed by an event B within 10 seconds.
select * from pattern [every EventA -> (timer:interval(10 sec) and not EventB)]
Outer joins are also a good way to detect missing events. A solution with an outer join was discussed above.
[top] - How do I detect the absence of an event and the presence of an event arriving too late?
-
Let's say we want to detect 2 situations: a) A Down event is not followed by an Up event, i.e. the Up event for the same equipment id is not coming in within 1 minute b) A Down event is followed by an Up event 30 seconds or more after the Down event, for the same equipment id as the Up event
select * from pattern [ every down=MyEvent(text='down') -> ( (timer:interval(1 min) and not up=MyEvent(text='Up', equipmentId=a.equipmentId)) or ( (timer:interval(30 sec) and not MyEvent(text='Up', equipmentId=a.equipmentId)) -> up=MyEvent(text='Up', equipmentId=a.equipmentId) where timer:within(30 seconds) )][top] - How do I report at a regular interval without any incoming events?
-
Let's say we want to have our listener get invoked every 5 seconds, and select the last value, if any, from a stream.
select (select price from MarketData.std:lastevent()) as price from pattern [every timer:interval(5 sec)]
The pattern fires every 5 seconds causing the sub-select to take place, returning null if no MarketData events have come in, or returning the price column of the last MarketData event.
[top] - How do I detect something really complex, like a triple-bottom pattern?
-
The triple-bottom pattern is out of the world of stock trading and is described in Triple-Bottom Pattern in detail.
The problem can be broken down: First, how does one identify bottom price points among a stream of market data events? Second, once the individual bottom price points are identified, how does one detect an occurrence of 3 bottom price points, whose value is within approximation of each other, and that are spaced out over time in a pattern?
The first problem is an event streaming processing problem, I believe. The stream of events is market data that contains price points for the NIFTY index over time. I'll attempt to define a bottom price point as follows: If the average price for the last 5 days is 15% lower then the average price over a period of say 60 days, then the minimum price during that 5 days is a single bottom price point. Of course the number of days and percentages are parameters to figure out and get right.
-- The query to determine the average price for the last 60 days: insert into AvgPriceLast60Days select avg(price) as avgPrice from MarketData.win:time(60 days) output every 10 minutes
-- The query to determine the average price for the last 5 days: insert into AvgPriceLast5Days select avg(price) as avgPrice, min(price) as minPrice from MarketData.win:time(5 days) output every 10 minutes
-- Compare the last average prices for each: insert into BottomPriceEvent select minPrice as bottomPrice from AvgPriceLast60Days.std:last() as LastAvg60Days, AvgPriceLast5Days.std:last() as LastAvg5Days where LastAvg60Days.avgPrice * 0.85 > LastAvg5Days.avgPrice output first every 1 dayThe last statement populates the "BottomPriceEvent" event stream as a stream of higher-level events in which each event represents a bottom price point.
The second part of the problem requires detecting 3 bottom price points whose values are within a given range of each other, and that have a certain temporal relationship with each other. Let's assume that the bottom price points should be within 5% each other. Let's also assume we are looking for bottom price points spaced at least 10 days apart from each other, but within 30 days of the prior bottom price point.
-- The pattern to detect the triple-bottom: insert into TripeBottomPattern select * from pattern [every a=ButtomPriceEvent -> timer:interval(10 days) -> ButtomPriceEvent(minPrice between 0.95*a.minPrice and 1.05*a.minPrice) where timer:within(30 days) -> timer:interval(10 days) -> ButtomPriceEvent(minPrice between 0.95*a.minPrice and 1.05*a.minPrice) where timer:within(30 days)]
Finally, the resulting TripeBottomPattern event stream created by the last statement is the higher-level complex events representing that a triple-bottom pattern has been detected.
An additional interesting problem is that the stream and pattern queries are rather long-running continuous queries, since they need to run over days and month. That may requires persisting events, and/or using simulated time by playing back past events into the engine.
[top] - Can I use an UpdateListener to listen to events and the iterator-based pull-API together?
-
UpdateListener and iterator can be used together. An update listener implementation can also itself query the same statement or another statement's iterator, as the engine guarantees the iterators are up-to-date before calling update listeners even across statements.
The iterator can also be used to query for no matches. You should find the iterator to have minimal overhead depending on the type of statement iterated on, the overhead for patterns statements specifically is negligible.
[top] - How do I stop an insert after a period of time?
-
Assume we only want to receive the first 10 minutes of an incoming event stream and then stop receiving data from that stream.
The timer:within pattern guard function can serve here to stop the stream after a given amount of time, as the next statement shows:
insert into PackageNotifyEvent select myevent.uid as uid, 'HOME' as loc, 'ARRIVED' as status from pattern[every myevent=TrackingEvent where timer:within(10 min)]
[top] - Can I use a regular expression (regexp) within a filter?
-
Yes a regular expression can be used as part of a filter expression. Pretty much any expression is allowed within event filter expressions other then aggregation functions and the previous or prior function.
select * from pattern[every myevent=TrackingEvent(event.uid regexp '^a-b-.*' and event.lat in [40.1:40.2] and event.lon in [-74.1:-74.0])]
[top] - How can I remove duplicates? What if I want to form pairs of events where each pair is a unique combination of the latest event of two streams?
-
I'm trying to detect pairs of events correlated by a "type" value and a unique "device" value. Then based on this pair of events, I'd like to find the maximum "measurement" value and the corresponding "confidence" for the one with the max "measurement" value. Here's my event object:
class Sensor { long id; String type; String device; Double measurement; Double confidence; }I'll know in advance the set of possible device values, but the Sensor events can happen in any order, and two or more Sensor event for the same device might occur before a Sensor event for the other device occurs. Thus if a Sensor event for the same device occurs before a Sensor event for the other device, then the second Sensor event would replace the first Sensor event for that device of the same type. In other words, the last event for a particular device of a given type is the one that should be used in the calculation of the maximum.
The computation would be the maximum value of the 'measurement' property between A and B. Also, the 'confidence' value would correspond to the value from the event with the maximum 'measurement' property.
A sample input and output:
Sensor[id=1,type='Temperature',device='A',measurement=51,confidence=94.5] Sensor[id=2,type='Temperature',device='A',measurement=57,confidence=95.5] Sensor[id=3,type='Humidity',device='B',measurement=29,confidence=67.5] Sensor[id=4,type='Temperature',device='B',measurement=55,confidence=88.0] Sensor[id=5,type='Temperature',device='B',measurement=65,confidence=85.0] Sensor[id=6,type='Temperature',device='B',measurement=49,confidence=87.0] Sensor[id=7,type='Temperature',device='A',measurement=51,confidence=99.5]
For output, one would expect the following:
// First output event pairs events with id=2 and id=4 and chooses the values from id=2 MaxReading[type='Temperature',device='A',measurement=57,confidence=95.5] // Second output event pairs events with id=6 and id=7, since the event with id=6 // replaces the one with id=5, the event with id=5 is never compared against the // event with id=7 MaxReading[type='Temperature',device='A',measurement=51,confidence=99.5]
One possible solution builds pairs of events using a join:
// Create pairs of device A and B events insert into Pair select * from Sensor(device='A').std:lastevent() as a, Sensor(device='B').std:lastevent() as b where a.type = b.type
From the resulting stream we remove those pairs in which either event was seen before, leaving unique pairs:
// Fast way of declaring the stream type, could also be done via config API or XML insert into PairDuplicatesRemoved select * from Pair(1=2) // Remove duplicate pairs where either sensor event is from the prior pair insert into PairDuplicatesRemoved select * from Pair where a.id != (select a.id from PairDuplicatesRemoved.std:lastevent()) and b.id != (select b.id from PairDuplicatesRemoved.std:lastevent())Last, select the maximum measurement value between the pair of sensor events and the corresponding confidence and device:
select a.type, max(a.measurement, b.measurement) as measurement, case when a.measurement > b.measurement then a.confidence else b.confidence end as confidence, case when a.measurement > b.measurement then a.device else b.device end as device from PairDuplicatesRemoved[top] - How do I detect a specific sequence of events and get all events that take part in this sequence?
-
I have events coming from different sources. They have 2 states: success and failure, and they have a source. I would like to create a query to know when there are, for example, for a specific source 5 failure events followed by a success one. Of course as soon as there's a success event, the previous failure events shouldn't count anymore.
There needs to be a maximum time to wait until a success event to arrive, since we don't want to keep looking without end for a matching success event. We'll put a maximum time to wait for a success event to arrive, let's say 5 minutes. So we'll just drop failure events after 5 minutes.
Let's look at some samples: F5 means 5th failure event, S3 means 3rd success event. Also let's say we only need 5 failure events before a success one to have an alert.
Case1 - If within 5 minutes I have (from the same source)
F1 F2 F3 F4 F5 S1
then I want to throw an alert. The alert must know about those events meaning I would like the listener to get those events.Case2 - If within 5 minutes I have (from the same source)
F1 F2 F3 F4 F5 F6 F7 F8 S1
then I would have an alert knowing about F4 to F8 and S1.Case3 - If within 5 minutes I have (from the same source)
F1 F2 F3 F4 S1 F5 F6 S2
then no alert would be emitted since once S1 arrives there were only 4 failure events.Case4 - still from the same source
F1 F2 F3 F4 (then 10 minutes later) F5 S1
then of course no alert as all the events aren't within the 5 minutes window.Case5 - If within 5 minutes (this time we have the sources a and b)
F1a F1b F2a F3a F2b F4a S1b F5a F3b S1a
No alert will be create when S1b arrive because there's only 2 failures for b. When S1a arrives an alert is created because we have F1a to F5a before, with no success concerning a in between.Solution: Since we are looking for a very specific sequence of events, a pattern is the best way to go. We want to make sure the pattern subexpressions end when a success event arrives, and this can be accomplished via the
not -operator. We also want to limit the expression to live 5 minutes from the first failure event:every a=F -> ( (b=F(src=a.src) and not S(src=a.src)) -> (c=F(src=a.src) and not S(src=a.src)) -> (d=F(src=a.src) and not S(src=a.src)) -> (d=F(src=a.src) and not S(src=a.src)) -> (e=S(src=a.src) and not F(src=a.src)) ) ) where timer:within(5 min)This solution works for all cases, including case 2. Even though the pattern looks for only 5 events in a row, it looks at any 5 subsequent events for the same source, matching case 2 for events F4 to F8 and S1 (the active expressions that include F1, F2 and F3 end when F6, F7 and F8 arrive).
[top] - How to implement a sell trailing stop order?
-
This is an example out of the stock trading domain, in which incoming events are market prices. A sell trailing stop order is a technique that is designed to allow an investor to specify a limit on the maximum possible loss, without setting a limit on the maximum possible gain.
A sell trailing stop order sets the lower boundary (stop) price at a fixed amount below the current market price with an attached "trailing" amount. As the market price rises, the stop price rises by the trail amount, but if the stock price falls, the stop price doesn't change, and a market order is submitted when the stop price (lower boundary) is hit.
Assume that the market price is 700 at the time of placing the trailing stop order. Assume that the stop price is 600. If the price goes to 703, the stop price must be updated to 603. If the price drops to 682, the trigger is still 603.
The solution considers the maximum market price since statement start time, compared against the current market price:
// since release 2.0 select * from Quote(symbol=GOOGL) where price = max(select max(lastPx) as lastPx from Quote(symbol='GOOG')) - 100, 600) // or since for release 1.x insert into MaxPrice(lastPx) select max(lastPx) as lastPx from Quote(symbol='GOOG') select * from Quote(symbol=GOOGL) where price = max(select lastPx from MaxPrice.std:lastevent()) - 100, 600)
[top]
About Esper and NEsper
NewsAbout Esper for Java
About NEsper for .NET
License
Terms of Use
Tutorials and Case Studies
TutorialQuick Start
Short Case Study
Longer Case Study
Solution Patterns
OnJava Article
TheServerSide Article
Past Presentations
FAQ for Java
FAQ for .NET
Additional Examples
Technology Links
Esper for Java
DownloadChange History
Documentation
Reporting Issues
Building
On Performance
NEsper for .NET
DownloadChange History
Documentation
Reporting Issues
Building
The Esper/NEsper Team
How To ContributeMailing Lists
Roadmap
Source Repository
Last Published: May 11, 2008
Version: 2.1.0
