|
Event Stream Intelligence: Esper & NEsper |
Transaction Case Study
The Problem
The use case involves tracking three components of a transaction. Each component comes to the engine as an event with the following fields:
- Transaction ID
- Time stamp
In addition, we have the following extra fields.
In event A:
- Customer ID
In event C:
- Supplier ID (the ID of the supplier that the order was filled through)
We need to take in events A, B and C and produce a single, combined event with the following fields:
- Transaction ID
- Customer ID
- Time stamp from event A
- Time stamp from event B
- Time stamp from event C
What we are doing here is matching the transaction IDs on each event, to form an aggregate event. If all these events were in a relational database, this could be done as a simple SQL join except that with 10,000 events per second, you will need some serious database hardware to do it.
Real-Time Summary Data
Further, we need to produce the following:
- Min,Max,Average total latency from the events (difference in time between A and C) over the past 30 minutes.
- Min,Max,Average latency grouped by (a) customer ID and (b) supplier ID. In other words, metrics on the the latency of the orders coming from each customer and going to each supplier.
Find Missing Events
We need to detect a transaction that did not make it through all three events. In other words, a transaction with events A or B, but not C. Note that, in this case, what we care about is event C. The lack of events A or B could indicate a failure in the event transport and should be ignored. Although the lack of an event C could also be a transport failure, it merits looking into.
Solution
Event Definitions
Events in Esper are represented via regular Java classes that expose JavaBean-style "getter" methods for access to event properties. Since all components of a transaction have a transaction id and timestamp field, we create a base event class with the common fields as below.
public class TxnEventBase {
private String transactionId;
private long timestamp;
public TxnEventBase(String transactionId, long timestamp) {
this.transactionId = transactionId;
this.timestamp = timestamp;
}
public String getTransactionId() {
return transactionId;
}
public long getTimestamp() {
return timestamp;
}
}
Event A in the 3-component transaction can now simply extend the TxnEventBase class and add the customer id field.
public class TxnEventA extends TxnEventBase {
private String customerId;
public TxnEventA(String transactionId, long timestamp, String customerId) {
super(transactionId, timestamp);
this.customerId = customerId;
}
public String getCustomerId() {
return customerId;
}
}
Detecting Combined Events
Now that we have defined the event classes, we can compose an EPL statement to detect combined events in which each component of the transaction is present. We restrict the event matching to the events that arrived within the last 30 minutes.
insert into CombinedEvent(transactionId, customerId, supplierId, latencyAC, latencyBC, latencyAB)
select C.transactionId, customerId, supplierId,
C.timestamp - A.timestamp, C.timestamp - B.timestamp, B.timestamp - A.timestamp
from TxnEventA.win:time(30 minutes) A,
TxnEventB.win:time(30 minutes) B,
TxnEventC.win:time(30 minutes) C
where A.transactionId = B.transactionId and B.transactionId = C.transactionId
This statement uses the insert into syntax to generate a CombinedEvent event stream. The next section uses that stream to derive realtime data on latencies between events.
Compiling Realtime Summary Data
We can now use the CombinedEvent stream to derive realtime summary data.
To derive the minimum, maximum and average total latency from the events (difference in time between A and C) over the past 30 minutes we can use the EPL below.
select min(latencyAC) as minLatencyAC, max(latencyAC) as maxLatencyAC, avg(latencyAC) as avgLatencyAC from CombinedEvent.win:time(30 minutes)
We can derive this data per customer and supplier id by specifying a group by clause.
select customerId, min(latencyAC) as minLatencyAC, max(latencyAC) as maxLatencyAC, avg(latencyAC) as avgLatencyAC from CombinedEvent.win:time(30 minutes) group by customerId
Finding Missing Events
An outer join allows us to detect a transaction that did not make it through all three events.
select *
from TxnEventA.win:time(30 minutes) A
full outer join TxnEventC.win:time(60 minutes) C on A.transactionId = C.transactionId
full outer join TxnEventB.win:time(30 minutes) B on B.transactionId = C.transactionId
where C.transactionId is null
When TxnEventA or TxnEventB events leave their respective time windows consisting of the last 30 minutes of events, Esper filters out rows in which no EventC row was found.
Using the API
Setting up the Engine
The Java code snippet below shows how an engine instance can be obtained. The Configuration is used to make the event classes known to the engine. This is optional and makes the EPL statements easier to read. Esper also supports fully-qualified Java class names in EPL statements.
Configuration configuration = new Configuration();
configuration.addEventTypeAlias("TxnEventA", TxnEventA.class.getName());
configuration.addEventTypeAlias("TxnEventB", TxnEventB.class.getName());
configuration.addEventTypeAlias("TxnEventC", TxnEventC.class.getName());
EPServiceProvider epService = EPServiceProviderManager.getProvider("TransactionExample", configuration);
Creating a Statement
The administrative interface on the engine creates statements:
String stmtSupplier = "select supplierId, min(latencyAC) as minLatency, max(latencyAC) as maxLatency, avg(latencyAC) as avgLatency" + "from CombinedEvent.win:time(30 minutes) " + "group by supplierId"; EPStatement bySupplierStatement = epService.getEPAdministrator().createEPL(stmtSupplier); bySupplierStatement.addListener(myListener);
Attaching Listeners
Listeners receive events published by statements and can query the received events. The listener interface receives event entering window(s) as new events (aka. istream or insert stream) and events as they are leaving window(s) as old events (aka. rstream or remove stream).
public class RealtimeSummaryTotalsListener implements UpdateListener {
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
EventBean event = newEvents[0];
log.debug(" Totals minAC=" + event.get("minLatencyAC") +
" maxAC=" + event.get("maxLatencyAC") +
" avgAC=" + event.get("avgLatencyAC"));
}
}
Sending Events
Finally, events are sent for processing by the engine via the runtime interface:
TxnEventA a = new TxnEventA("txnid1", 1, "customerId1");
epService.getEPRuntime().sendEvent(event);
Running the Example
The complete example code can be found in the "examples" folder of the distribution in package com.espertech.esper.example.transaction. The example contains a transaction simulator that can be invoked from the command line. The readme file in the "examples/etc" folder contains build instructions and command line parameters to run the transaction simulator from the command line. You do not need to perform a build of the Esper engine to run the example, the Esper jar file has been included in the distribution.
Please consult the examples section in the reference documentation for more information.
About Esper and NEsper
NewsAbout Esper for Java
About NEsper for .NET
License
Terms of Use
Tutorials and Case Studies
TutorialQuick Start
Short Case Study
Longer Case Study
Solution Patterns
OnJava Article
TheServerSide Article
Past Presentations
FAQ for Java
FAQ for .NET
Additional Examples
Technology Links
Esper for Java
DownloadChange History
Documentation
Reporting Issues
Building
On Performance
NEsper for .NET
DownloadChange History
Documentation
Reporting Issues
Building
The Esper/NEsper Team
How To ContributeMailing Lists
Roadmap
Source Repository
Last Published: May 11, 2008
Version: 2.1.0
